In [1]:
import numpy as np
import scipy.stats as stats
import matplotlib.pyplot as plt
from math import log, e
from sklearn.decomposition import PCA
from scipy.integrate import solve_ivp
from scipy.stats import linregress
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

In [2]:
def frobnorm(mat): 
    r"""
        Compute the frobenius norm 
    """
    n, m = len(mat), len(mat[0])
    matsum = 0
    for i in range(0,n): 
        for j in range(0,m): 
            matsum += mat[i][j]**2

    return matsum**0.5


def cossim(a,b): 
    r"""
        Compute the cossine similarity
    """
    return np.trace(a.T @ b) / (frobnorm(a)*frobnorm(b))

In [3]:
def ell_inf(A, B): # chebyshev norm
    return max(abs(a-b) for a,b in zip(A,B))

def cor_sum(vec, r): # correlation sum
    r"""
        Compute the correlation sum given some radius r
    """
    
    n = len(vec)
    neighbor_counts = [
        sum(ell_inf(x_i, x_j) <= r for x_j in vec) / n
        for x_i in vec
    ]

    return sum(neighbor_counts) / n

def approx_entropy(time_series, m, d, r): 
    """
        Estimate the approximate/discrete entropy of a time series (approximating the growth rate of r-balls). 

        Params: 

        time_series: 1d array_like 
        m, d: int
            delay embedding parameters
        r: float
            radius determining the size of an r-ball
    """
    
    def phi(delay_dim, delay): 
        """
            Compute the log-average of r-balls assuming the delay embedding parameters result in a diffeomorphism 
            of the 1d time series from R^1 to R^d. 
        """
        X = delay_embed(time_series, delay_dim, delay)
        n = len(X)
        neighbor_counts = [
            sum(ell_inf(x_i, x_j) <= r for x_j in X) / n
            for x_i in X
        ]
        count = [c for c in neighbor_counts if c > 0]
        return sum(math.log(c) for c in count) / n

    return abs(phi(m+1, d) - phi(m, d))

def cor_dim(time_series, m, d, r): # correlation dimension; easier to compute than minkowski dimension
    """
        Estimate the correlation dimension of a time series
    """
    X = delay_embed(time_series, m, d)
    CS = cor_sum(X, r)

    return math.log(CS) / math.log(r)

In [4]:
# def AutoCor_k(arr, k): 
#     n = len(arr)
#     M = np.mean(arr)
#     num, denom = 0, 0
#     for i in range(0, n):
#         denom+=(arr[i]-M)**2
#     for i in range(0, n-k): 
#         num+=(arr[i]-M)*(arr[i+k]-M)

#     return num/denom

def acf(x, lag, unbiased=True): # autocorrelation function; used to determine the optimal time delay
    r"""
        Compute the autocorrelation of a signal given delay/lag k. Can be used to determine the optimal time delay. 

        Params:

        x: 1d array_like input data
        lag: integer 
    """
    x = np.asarray(x, dtype=float).ravel()
    x = x-x.mean()
    n = x.size

    num = np.dot(x[:n-lag], x[lag:])
    denom = np.dot(x, x)

    if unbiased: 
        num /= (n-lag)
        denom /= n
    else: 
        num /= n
        denom /= n

    return num / denom

def plot_acf(x, max_lag, unbiased=True): 
    r"""
        Plot ACF from delay/lag = 1 to max_lag k. Returns the list [acf_i] where i is the delay parameter. 

        Params:

        max_lag: int
    """
    x = np.asarray(x, dtype=float).ravel()
    acf_list = [acf(x, i, unbiased=unbiased) for i in range(max_lag)]

    fig, ax = plt.subplots(figsize=(10,6))
    lags = range(max_lag)
    ax.plot(lags, acf_list, 'bo-', markersize=4, linewidth=1)
    ax.axhline(y=0, color='black', linestyle='-', alpha=0.3)

    ax.set_xlabel('Lag')
    ax.set_ylabel('Autocorrelation')
    ax.set_title('Autocorrelation Function over Lag')
    ax.grid(True, alpha=0.3)
    ax.legend()

    ax.set_ylim(-1.2, 1.2)
    plt.tight_layout()
    plt.show()

    return acf_list

In [5]:
def p_hist(x, bins='fd', base=e): 
    r"""
        Compute the probability distribution from histogram. Returns the probability and edges (from the hist). 

        Params: 
        
        x: array-like input data
        bins: see numpy documentation. fd: Freedman Diaconis estimator
    """
    
    x = np.asarray(x, dtype=float).ravel()
    edges = np.histogram_bin_edges(x, bins=bins)
    counts, _ = np.histogram(x, bins=edges)
    total = counts.sum()

    if total == 0: 
        return np.array([1.0]), edges

    p = counts.astype(float) / total

    return p, edges

def joint_p_hist(x, y, bins='fd', base=e, edge_x=None, edge_y=None):
    r"""
        Compute the joint probability distribution P_xy from histogram.

        Params: 

        x, y: array_like input data
        edge_x, edge_y: predefined edges of hist_x and hist_y. 
            Specify edges to ensure consistency. 
    """

    x = np.asarray(x, dtype=float).ravel()
    y = np.asarray(y, dtype=float).ravel()

    if edge_x is None: 
        edge_x = np.histogram_bin_edges(x, bins=bins)
    if edge_y is None: 
        edge_y = np.histogram_bin_edges(y, bins=bins)

    counts, _, _ = np.hisogram2d(x, y, bins=(edge_x, edge_y))
    total = counts.sum()

    if total == 0: 
        return np.array([[1.0]], edge_x, edge_y)

    p = counts.astype(float) / total

    return p, edge_x, edge_y

def H_from_p(p, base=e): 
    r"""
        Compute the shannon entropy given a probability distribution p. 
    """
    p = np.asarray(p, dtype=float)
    p = p[p > 0]

    if p.size == 0: 
        return 0.0
        
    return -np.sum(p * (np.log(p) / np.log(base)))

In [6]:
def delay_embed(time_series, embedding_dimension, time_delay):
    r"""
        Delay-embed a 1-dimensional seqeunce (time series). Returns R^{N-t(d-1) x d} delay embedding. 

        time_series: array-like input data
        embedding_dimension (d), time_delay (tau): delay-embedding parameters. 

        Original 1-d data transformed as [[x_0, x_t, ..., x_t(d-1)], ....]
    """
    N = len(time_series)
    num_vecs = N - (embedding_dimension - 1) * time_delay
    embedded = np.zeros((num_vecs, embedding_dimension))
    for i in range(num_vecs):
        for j in range(embedding_dimension):
            embedded[i, j] = time_series[i + j * time_delay]

    # delay_matrix = num_vecs**(-0.5)*embedded
    
    return embedded

def rolling_cor(x, y, dim, delay): 
    r"""
        Compute the rolling correlation between two delay-embedded time series. 
    """
    x_embed = delay_embed(x, dim, delay)
    y_embed = delay_embed(y, dim, delay)
    cors = []
    for i in range(len(x_embed)): 
        cor_mat = np.corrcoef(x_embed[i], y_embed[i])
        cors.append(cor_mat[0,1])

    return np.array(cors)

# Old --------
def shannon_entropy(x, bins='fd', base=e, from_probs=False): 
    x = np.asarray(x, dtype=float)

    if from_probs: 
        p = x/x.sum()
    else: 
        edges = np.histogram_bin_edges(x, bins=bins)
        counts, _ = np.histogram(x, bins=edges)
        p = counts / counts.sum()

    p = [p > 0]
    H = -np.sum(p * (np.log(p) / np.log(base)))

    return H

def joint_entropy(x, y, bins='fd', base=e, from_probs=False):
    x, y = np.asarray(x, dtype=float), np.asarray(y, dtype=float)

    if prob_probs: 
        p_xy = x / x.sum()
    else: 
        edges_x = np.histogram_bin_edges(x, bins=bins)
        edges_y = np.histogram_bin_edges(y, bins=bins)
        counts, _, _ = np.histogram2d(x, y, bins=[edges_x, edges_y]) 
        p_xy = counts / counts.sum()

    p_xy = p_xy.flatten()
    p_xy = p_xy[p_xy > 0]

    H_xy = -np.sum(p_xy * (np.log(p_xy) / np.log(base)))
    
    return Hxy

def conditional_entropy(x, y, bins='fd', base=e, from_probs=False): 
    H_xy = join_entropy(x, y, bins=bins, base=base, from_probs=from_probs)
    H_y = shannon_entropy(y, bins=bins, base=base, from_probs=from_probs)

    return H_xy - H_y

def mutual_info(x, y, bins='fd', base=e, from_probs=False): 
    H_x = shannon_entropy(x, bins=bins, base=base, from_probs=from_probs)
    H_y = shannon_entropy(y, bins=bins, base=base, from_probs=from_probs)
    H_xy = join_entropy(x, y, bins=bins, base=base, from_probs=from_probs)

    return H_x + H_y - H_xy

    
# mutual information estimate based on historgram
def ami(x, max_lag, bins='fd', base=e):
    x = np.asarray(x, dtype=float)
    N = x.size
    lags = np.arange(1, max_lag+1)
    ami = np.empty_like(lags, dtype=float)

    Hx = shannon_entropy(x, bins=bins, base=base)

    for idx, tau in enumerate(lags): 
        x1, x2 = x[:-tau], x[tau:]
        
    return lags, ami
# ------------

def delayPCA(time_series, delay_dim, delay, pca_dim): 
    ts_emb = delay_embed(time_series, delay_dim, delay)
    pca = PCA(pca_dim).fit(ts_emb)
    ts_delaypca = pca.transform(ts_emb) 

    return ts_delaypca, pca

def project_pca(data, pca, numcomps=2): 
    x_proj = (data-pca.mean_) @ pca.components_[0:numcomps].T
    interval = np.arange(len(x_proj))
    if numcomps == 2: 
        plt.scatter(x_proj[:,0], x_proj[:,1], c=interval, cmap='viridis')
        plt.xlabel("PC 1"); plt.ylabel("PC 2")
    elif numcomps == 3: 
        fig = plt.figure()
        ax = fig.add_subplot(111, projection='3d')
        ax.scatter(x_proj[:,0], x_proj[:,1], x_proj[:,2], c=interval, cmap='viridis', **scatter_kw)
        ax.set_xlabel("PC 1"); ax.set_ylabel("PC 2"); ax.set_zlabel("PC 3")
    plt.title(f"Projection onto first {numcomps} PCs")
    plt.tight_layout(); plt.show()
        
    return x_proj

def update_delayPCA(oldata, newobs, dim_delay, delay, dim_pca): 
    # make sure dim_pca is less than dim_delay
    # this is a horrible piece of code; remind myself to fix this
    assert dim_delay > dim_pca, "PCA dim must be less than delay dim"
    
    pca = PCA(dim_pca)
    oldata_emb = delay_embed(oldata, dim_delay, delay)
    old_pca = pca.fit(oldata_emb) 
    
    newdata = np.hstack((oldata, newobs))
    newdata_emb = delay_embed(newdata, dim_delay, delay)
    new_pca = pca.fit(newdata_emb)
    newdata_delaypca = new_pca.transform(newdata_emb)
    newdata_oldbasis = old_pca.transform(newdata_emb)
    
    return old_pca, new_pca, newdata_delaypca, newdata_oldbasis

def return_data_stats(data, dim, delay, r): 
    mean, std, ent, cordim = np.mean(data), np.std(data), approx_entropy(data, dim, delay, r), cor_dim(data, dim, delay, r)
    print(f"Data Mean: {mean} Data Stdev: {std} \nData Entropy: {ent} Data Correlation Dim: {cordim}")

    return mean, std, ent, cordim

def pca_diff_stats(pca1, pca2, ncomps): 
    angdiff = cossim(pca1.components_[:ncomps], pca2.components_[:ncomps])
    meandiff = np.abs(pca1.mean_ - pca2.mean_)
    print(f"Mean Difference: {meandiff} Cosine Similarity: {angdiff}")
    plt.figure(figsize=(8,8))
    plt.plot(pca1.explained_variance_, label=f'{pca1} Eigenvalues')
    plt.plot(pca2.explained_variance_, label=f'{pca2} Eigenvalues')
    plt.legend()
    plt.show()

    return angdiff, meandiff

def compare_pcas(pca1, pca2, k, X1=None, X2=None):
    from scipy.linalg import subspace_angles
    U1, U2 = pca1.components_[:k].T, pca2.components_[:k].T
    ang = subspace_angles(U1, U2)
    subspace_dist = np.linalg.norm(np.sin(ang))
    var_gap = (pca1.explained_variance_ratio_[:k].cumsum() -
               pca2.explained_variance_ratio_[:k].cumsum())[-1]
    overlap_sv = np.linalg.svd(U1.T @ U2, compute_uv=False)

    results = dict(
        principal_angles_deg=np.degrees(ang),
        grassmann_dist=subspace_dist,
        cum_variance_gap=var_gap,
        overlap_singular_vals=overlap_sv,
    )
    if X1 is not None and X2 is not None:
        results["recon_MSE_1to2"] = pca_cross_recon_loss(pca1, pca2, X2)
        results["recon_MSE_2to1"] = pca_cross_recon_loss(pca2, pca1, X1)
    return results

def pca_recon_loss(X, pca, k): 
    Z = pca.transform(X)
    Z[:, k:] = 0
    X_hat = pca.inverse_transform(Z)

    mse = np.mean((X-X_hat)**2)
    per_sample = ((X-X_hat)**2).mean(axis=1)
    per_feature = ((X-X_hat)**2).mean(axis=0)
    var_j = X.var(axis=0, ddof=0)
    r2 = 1.0 - per_feature/var_j

    return mse, r2, X_hat

# needs to be fixed
def pca_cross_recon_loss(X, pca_enc, pca_dec, k): 
    """
        Calculates cross reconstruction loss for PCA. If pca_enc, pca_dec come from the same affine PCA transformation
        then it returns the regular reconstruction loss. They are related to each other by 
        Z = (X - mu)B and X = ZB^T + mu
    """
    B_enc, mu_enc = pca_enc.components_[:k].T, pca_enc.mean_
    B_dec, mu_dec = pca_dec.components_[:k].T, pca_dec.mean_

    Z_enc = (X - mu_enc) @ B_enc

    U, _, Vt = np.linalg.svd(B_enc.T @ B_dec, full_matrices = False)
    R = U @ Vt

    Z_dec = Z_enc @ R

    X_hat = Z_dec @ B_dec.T + mu_dec

    mse = np.mean((X-X_hat)**2)
    per_sample = ((X-X_hat)**2).mean(axis=1)
    per_feature = ((X-X_hat)**2).mean(axis=0)
    var_j = X.var(axis=0, ddof=0)
    r2 = 1.0 - per_feature/var_j

    return mse, r2, X_hat

In [7]:
# def nearest_nbr(data): 
#     n, d = data.shape
#     M_idx = np.empty(N, dtype=int)
#     M_dist = np.empty(N, dtype=float)
#     for i in range(N):
#         best_dist = np.inf
#         best_idx = -1
#         x_i = data[i]
#         for j in range(N): 
#             if i == j: 
#                 continue
#             dist = np.dot(x_i - data[j], x_i - data[j])
#             if dist < best_dist: 
#                 best_dist = dist
#                 best_idx = j
#         M_idx[i] = best_idx
#         M_dist[i] = best_dist ** 0.5

#     return dist, M_idx 

# def nearest_nbr(X): 
#     N, d = X.shape
#     D = np.full((N,N), np.inf)
#     for i in range(N): 
#         for j in range(N): 
#             if i == j: 
#                 continue 
#             else: 
#                 D[i][j] = np.sqrt(np.dot(X[i]-X[j], X[i]-X[j])) 
                
#     M_idx = np.argmin(D, axis=1)
#     D_min = D[np.arange(N), M_idx]
#     return D_min, M_idx

def nearest_nbr(X): 
    diff = X[:, None, :] - X[None, :, :]
    D2 = np.sum(diff*diff, axis=-1)

    np.fill_diagonal(D2, np.inf)

    idx_nn = np.argmin(D2, axis=1)
    dists = np.sqrt(D2[np.arange(X.shape[0]), idx_nn])

    return dists, idx_nn
    
def FalseNN(data, max_dim, delay, Atol, Rtol): # false nearest neighbor used to determine optimal embedding dimension
    # assert len(data) > 
    data = np.asarray(data).ravel()
    stdev = np.std(data)
    if stdev == 0: 
        raise ValueError("Zero Standard Deviation")

    fnn_ptg = []
    
    for m in range(1, max_dim): 
        # emb_m = delay_embed(data, m, delay)
        emb_m1 = delay_embed(data, m+1, delay)
        emb_m = emb_m1[:, :-1]  

        d_m, idx_nn = nearest_nbr(emb_m)

        new_coord, new_coord_nn = emb_m1[:, -1], emb_m1[idx_nn, -1]
        rel_growth = np.abs(new_coord - new_coord_nn) / d_m
        cond_rel = rel_growth > Rtol

        d_m1 = np.linalg.norm(emb_m1 - emb_m1[idx_nn], axis=1)
        cond_abs = (d_m1/stdev) > Atol

        fnn_ptg.append(100.0 * np.mean(cond_rel | cond_abs))
        

    return np.array(fnn_ptg)

def lyap_exp(series, dim, delay, theiler_window=0, horizon=100, fs=1.0): 
    """
        Calculate the largest Laypunov exponent of a time series using Rosenstein. 

        Params-----

        series (1d array_like): 1d input time series data
        dim, delay (int): delay embedding params
        theiler_window (int): window to exclude temporally close neighbors
        horizon (int): Number of time steps to follow the divergence
        fs (float): The sampling frequency of the series (in Hz)
    """
    traj = delay_embed(series, dim, delay)
    N = len(traj)

    dist, idx = nearest_nbr(traj)

    log_divergence = np.zeros((N, horizon))

    for i in range(N):
        nbr_idx = [i, 1]
    if abs(i - nbr_idx) > theiler_window:
        for k in range(horizon):
            if i + k < N and neighbor_idx + k < N:
                dist = np.linalg.norm(traj[i + k] - traj[nbr_idx + k])
                if dist > 0:
                    log_divergence[i, k] = np.log(dist)
                else:
                    log_divergence[i, k] = np.nan
            else:
                log_divergence[i, k] = np.nan

    avg_log_divergence = np.nanmean(log_divergence, axis=0)
    
    time_steps = np.arange(horizon)
    
    valid_idx = ~np.isnan(avg_log_divergence)
    if not np.any(valid_idx):
        return np.nan, np.array([]), np.array([]) # Not enough data

    fit = linregress(time_steps[valid_idx], avg_log_divergence[valid_idx])
    
    lyap_exp = fit.slope * fs

    return lyap_exp, avg_log_divergence, time_steps

In [8]:
class MLP(nn.Module):
    """
    A simple Multi-Layer Perceptron (MLP) with optional normalization and dropout.
    """
    def __init__(
        self,
        num_input: int,
        num_hidden: list,
        num_output: int,
        nonlinearity: str = "relu",
        norm: str = None,
        dropout: float = 0.0,
        out_nonlinearity: str = None,
        init: str = "xavier",
        bias: bool = True,
    ):
        super().__init__()
        
        # Define activation functions
        activations = {"relu": nn.ReLU(), "tanh": nn.Tanh(), "sigmoid": nn.Sigmoid()}
        self.nonlinearity = activations.get(nonlinearity)
        
        layers = []
        current_dim = num_input
        
        # Hidden layers
        for hidden_dim in num_hidden:
            layers.append(nn.Linear(current_dim, hidden_dim, bias=bias))
            
            if norm == "batch":
                layers.append(nn.BatchNorm1d(hidden_dim))
            elif norm == "layer":
                layers.append(nn.LayerNorm(hidden_dim))
                
            # Correctly append the already-instantiated activation function
            if self.nonlinearity:
                layers.append(self.nonlinearity)
            
            if dropout > 0:
                layers.append(nn.Dropout(dropout))
            
            current_dim = hidden_dim
            
        # Output layer
        layers.append(nn.Linear(current_dim, num_output, bias=bias))
        
        if out_nonlinearity:
            self.out_nonlinearity = activations.get(out_nonlinearity)
            if self.out_nonlinearity:
                layers.append(self.out_nonlinearity)

        self.net = nn.Sequential(*layers)
        
        # Weight initialization
        if init == "xavier":
            self.net.apply(self._xavier_init)
        elif init == "kaiming":
            self.net.apply(self._kaiming_init)

    def _xavier_init(self, module):
        if isinstance(module, nn.Linear):
            nn.init.xavier_uniform_(module.weight)
            if module.bias is not None:
                nn.init.constant_(module.bias, 0)
                
    def _kaiming_init(self, module):
        if isinstance(module, nn.Linear):
            nn.init.kaiming_uniform_(module.weight, nonlinearity='relu')
            if module.bias is not None:
                nn.init.constant_(module.bias, 0)

    def forward(self, x):
        return self.net(x)

In [9]:
# helper functions: 
def plot_delay(data, tau): 
    x = data[:-tau]  
    y = data[tau:]  
    
    plt.scatter(x, y, s=1, alpha=0.7)  
    plt.xlabel(f'x(t)')
    plt.ylabel(f'x(t+{tau})')
    plt.title(f'Delay Embedding (τ={tau})')
    plt.show()

# def L2_matrix(X):
#     X_norm = np.sum(X**2, axis=1, keepdims=True)
    
#     return np.sqrt(X_norm + X_norm.T - 2 * X @ X.T)

def L2_matrix(X):
    X = np.asarray(X, dtype=float)
    if X.ndim == 1:
        X = X[:, None]                      
    Xn = np.sum(X*X, axis=1)
    D2 = Xn[:, None] + Xn[None, :] - 2.0 * (X @ X.T)
    np.maximum(D2, 0, out=D2) 
    
    return np.sqrt(D2)
    
# good vlues for r: 0.1 * max(d(x_i, x_j)), 0.2 * stdev(X) where x_i is a delay vector 
# X in the def below is a 1d time series
def recurrence_mat(X, r, dim=2, delay=1):
    T = delay_embed(X, dim, delay)
    D = L2_matrix(T)

    R = (D < r).astype(int)

    return R

def plot_recurrence(X, r, dim, delay, title="Recurrence Plot"):
    R = recurrence_mat(X, r, dim=dim, delay=delay)
    
    plt.figure(figsize=(8, 8))
    plt.imshow(R, cmap='binary', origin='lower')
    plt.title(title)
    plt.xlabel('Time i')
    plt.ylabel('Time j')
    plt.colorbar(label='Recurrence')
    plt.show()
    

def calculate_pca_loss_to_dim(X, pca, dim_min, dim_max): 
    pca_recon_loss_list = []
    for i in range(dim_min, dim_max+1): 
        loss, _, _ = pca_recon_loss(X, pca, i)
        pca_recon_loss_list.append(loss)

    return pca_recon_loss_list

def calculate_pca_feature_loss_to_dim(X, pca, dim_min, dim_max): 
    N = dim_max - dim_min + 1
    k = X.shape[1]
    feature_loss_mat = np.zeros((N, k))
    for i in range(N): 
        _, feature_loss, _ = pca_recon_loss(X, pca, i+dim_min)
        feature_loss_mat[i] = feature_loss

    return feature_loss_mat

def plot_pca_recon_loss(X, pca, dim_min, dim_max): 
    res = calculate_pca_loss_to_dim(X, pca, dim_min, dim_max)

    plt.title("PCA Reconstruction Loss (MSE) vs. Dimension")
    plt.xlabel("Dimension")
    plt.ylabel("MSE")
    plt.plot(np.arange(dim_min, dim_max+1), res, '.')
    plt.show()

### Loss and Error Functions

In [10]:
def pe(y, y_pred): 
    y, y_pred = np.asarray(y), np.asarray(y_pred)
    return (y_pred - y) / y

def ape(y, y_pred): 
    y, y_pred = np.asarray(y), np.asarray(y_pred)
    return np.abs(y_pred - y) / np.abs(y)

def squared_error(y, y_pred): 
    y, y_pred = np.asarray(y), np.asarray(y_pred)
    return (y_pred - y)**2

def MAPE(y, y_pred): 
    return np.mean(ape(y, y_pred))

def MSE(y, y_pred): 
    return np.mean(squared_error(y, y_pred))

class PredError:
    """
    A class to encapsulate prediction error calculations and plotting.
    """
    def __init__(self, y_true, y_pred):
        """
        Initializes the class and calculates all relevant error metrics.
        """
        self.y_true = np.asarray(y_true)
        self.y_pred = np.asarray(y_pred)

        # This private method is called to compute all errors at once
        self._calculate_all_errors()

    def _calculate_all_errors(self):
        """Calculates pointwise and aggregate errors."""
        # Pointwise errors (for plotting)
        self.pe = pe(self.y_true, self.y_pred)
        self.ape = ape(self.y_true, self.y_pred)
        self.squared_error = squared_error(self.y_true, self.y_pred)
        
        # Aggregate errors (for reporting)
        self.mape = MAPE(self.y_true, self.y_pred)
        self.mse = MSE(self.y_true, self.y_pred)

    def plot(self, error_type='squared'):
        """
        Plots a specific type of pointwise error.
        
        Args:
            error_type (str): The error to plot. Can be 'pe', 'ape', 
                              or 'squared_error'.
        """
        error_data = {
            'pe': self.pe,
            'ape': self.ape,
            'squared': self.squared_error
        }
        
        titles = {
            'pe': 'Pointwise Percentage Error (PE)',
            'ape': 'Pointwise Absolute Percentage Error (APE)',
            'squared': 'Pointwise Squared Error'
        }

        if error_type not in error_data:
            valid_types = ", ".join(error_data.keys())
            raise ValueError(f"Invalid error_type '{error_type}'. Please choose from: {valid_types}")

        plt.figure(figsize=(12, 6))
        plt.plot(error_data[error_type])
        plt.title(f"{titles[error_type]} on Test Set")
        plt.xlabel("Time Step")
        plt.ylabel(error_type.upper())
        plt.grid(True)
        plt.show()

## Time Series Data

In [22]:
def logistic_map(r, x_0, max_iter=1000):
    """Generates a time series using the logistic map equation."""
    traj = [x_0]
    for i in range(1, max_iter):
        x = r * traj[i-1] * (1 - traj[i-1])
        traj.append(x)
    return np.array(traj, dtype=np.float32)

def laser_map(g, x_0, max_iter=1000):
    """Generates a time series using the logistic map equation."""
    traj = [x_0]
    for i in range(1, max_iter):
        x = g * traj[i-1] * (1 - np.tanh(traj[i-1]))
        traj.append(x)
    return np.array(traj, dtype=np.float32)

def weierstrass(x, a, b, max_n):
    """Generates a time series using the Weierstrass function."""
    x = np.asarray(x, dtype=float)
    res = np.zeros_like(x, dtype=np.float32)
    for i in range(max_n):
        res += a**i * np.cos(b**i * np.pi * x)
    return res

def lorenz(t, xyz, sigma=10, rho=28, beta=8/3):
    x, y, z = xyz
    dxdt = sigma * (y - x)
    dydt = x * (rho - z) - y
    dzdt = x * y - beta * z
    return [dxdt, dydt, dzdt]

def generate_lorenz(initial_state=[1.0, 0.0, 0.0], sigma=10.0, rho=28.0, beta=8.0/3.0, dt=0.01, num_steps=10000):

    t_span = [0, num_steps * dt]
    t_eval = np.arange(0.0, num_steps * dt, dt)

    solution_obj = solve_ivp(
        fun=lorenz, 
        t_span=t_span, 
        y0=initial_state, 
        args=(sigma, rho, beta),
        t_eval=t_eval
    )
    
    solution = solution_obj.y.T
    
    return solution

def plot_lorenz_attractor(data):
    fig = plt.figure(figsize=(12, 9))
    ax = fig.add_subplot(111, projection='3d')
    
    x = data[:, 0]
    y = data[:, 1]
    z = data[:, 2]
    
    ax.plot(x, y, z, lw=0.5, color='deepskyblue')
    
    # Set titles and labels for clarity
    ax.set_title("Lorenz Attractor", fontsize=16)
    ax.set_xlabel("X Axis")
    ax.set_ylabel("Y Axis")
    ax.set_zlabel("Z Axis")
    
    plt.show()

def mackey_glass_discrete(N=10000, tau=17, beta=0.2, gamma=0.1, n=10, x0=1.2):
    size = N + tau + 1
    x = np.empty(size, dtype=float)
    x[:tau+1] = float(x0) 

    for t in range(tau, size - 1):
        x_tau = x[t - tau]
        x[t + 1] = (1.0 - gamma) * x[t] + beta * x_tau / (1.0 + x_tau**n)

    return x[tau+1:]

def double_sinusoid(x, phi=0.07, psi=np.sqrt(2)/10): 
    x = np.asarray(x, dtype=float)
    return np.sin(2*np.pi*phi*x) + 0.6*np.sin(2*np.pi*psi*x)

def chirp(x, phi=0.02, psi=5*1e-5): 
    x = np.asarray(x, dtype=float)
    return np.sin(2*np.pi*(phi*x + psi/2*x**2))

def generate_timeseries(key, length): 
    if key == 'logistic':
        print("Using Logistic Map dataset.")
        series = logistic_map(r=3.8282, x_0=0.5, max_iter=length)
    elif key == 'weierstrass':
        print("Using Weierstrass Function dataset.")
        x_vals = np.linspace(-2, 2, length)
        series = weierstrass(x=x_vals, a=0.5, b=3, max_n=15)
    elif key == 'doublesine': 
        print("Using double sinusoid dataset.")
        x_vals = np.linspace(-1000, 1000, length)
        series = double_sinusoid(x_vals)
    elif key == 'chirp': 
        print("Using chirp dataset.")
        x_vals = np.linspace(-1000, 1000, length)
        series = chirp(x_vals)
    elif key == 'macglass': 
        print("Using discrete Mackey-Glass dataset.")
        series = mackey_glass_discrete(N=length, tau=17, beta=0.2, gamma=0.1, n=10, x0=1.2)
    elif key == 'laser':
        print("Using the laser-logistic map dataset.")
        series = laser_map(g=20, x0=0.5, max_iter=length)
    else:
        raise ValueError("Unknown dataset specified.")
        
    return series 

In [12]:
def plot_bifur_logistic(param_values, x0=0.5, iter_trans=1000, iter_plot=256):
    """
        Plots the bifurcation diagram for logistic map.
    """
    iter_total = iter_trans + iter_plot

    fig, ax = plt.subplots(figsize=(15,8), dpi=150)

    for r in param_values:
        X = logistic_map(r, x0, iter_total)
        attractors = X[iter_trans:]
        plot_values_r = np.full(iter_plot, r)

        ax.plot(plot_values_r, attractors,
                marker=',', color='black', linestyle='none', alpha=0.5)

    ax.set_title("Bifurcation Diagram of the Logistic Map", fontsize=16)
    ax.set_xlabel("Growth Rate Parameter (r)", fontsize=12)
    ax.set_ylabel("Equilibrium Population (x)", fontsize=12)
    ax.set_xlim(param_values[0], param_values[-1])
    ax.set_ylim(0, 1)
    plt.show()

def plot_bifur_laser(param_values, x0=0.5, iter_trans=1000, iter_plot=256):
    """
        Plots the bifurcation diagram for logistic map.
    """
    iter_total = iter_trans + iter_plot

    fig, ax = plt.subplots(figsize=(15,8), dpi=150)

    for r in param_values:
        X = laser_map(r, x0, iter_total)
        attractors = X[iter_trans:]
        plot_values_r = np.full(iter_plot, r)

        ax.plot(plot_values_r, attractors,
                marker=',', color='black', linestyle='none', alpha=0.5)

    ax.set_title("Bifurcation Diagram of the Logistic Map", fontsize=16)
    ax.set_xlabel("Growth Rate Parameter (r)", fontsize=12)
    ax.set_ylabel("Equilibrium Population (x)", fontsize=12)
    ax.set_xlim(param_values[0], param_values[-1])
    plt.show()