In [None]:
"""Sandbox module."""
from functools import partial

import matplotlib.pyplot as plt
import numpy as np
import skfda
from scipy import stats
from skfda.preprocessing.dim_reduction import FPCA

%matplotlib inline

# Nonparametric Statistics

## Kernel density estimation

In [None]:
kernels = {
    "Epanechnikov": lambda u: 3
    / (4 * np.sqrt(5))
    * (1 - (u**2) / 5)
    * int(abs(u) <= np.sqrt(5)),
    "Uniform": lambda u: 0.5 * int(abs(u) <= 1),
    "Triangular": lambda u: (1 - abs(u)) * int(abs(u) <= 1),
}

In [None]:
def kernel_estimator(x, h, sample, kernel_type):
    """Kernel density estimator function."""
    k = np.vectorize(kernels[kernel_type])
    return 1 / (len(sample) * h) * np.sum(k((x - sample) / h))

### Fix parameters and generate sample

In [None]:
n = 200
n_grid = 100
grid_ending = 10
mu = 0
sigma = 10

sample = np.random.default_rng(seed=28071995).normal(loc=mu, scale=sigma, size=n)
grid = np.linspace(start=-grid_ending, stop=grid_ending, num=n_grid)
# Rule-of-Thumb bandwidth (Li and Racine 2007, p. 66)
bandwidth = np.std(sample) * (n ** (-0.2))  # should implement optimal bandwidth

In [None]:
kernel_estimator_given_sample = partial(kernel_estimator, sample=sample)

### Generate fitted values

In [None]:
values_epa = [
    kernel_estimator_given_sample(x=i, h=bandwidth, kernel_type="Epanechnikov")
    for i in grid
]
values_uni = [
    kernel_estimator_given_sample(x=i, h=bandwidth, kernel_type="Uniform") for i in grid
]
values_tri = [
    kernel_estimator_given_sample(x=i, h=bandwidth, kernel_type="Triangular")
    for i in grid
]

### Plots

In [None]:
fig, ax = plt.subplots()
ax.plot(grid, values_epa, label="Epanechnikov")
ax.plot(grid, stats.norm.pdf(grid, loc=mu, scale=sigma), label="True density")
# plot histogram for comparison
ax.hist(
    sample,
    bins=grid,
    density=True,
    histtype="step",
    edgecolor="black",
    linewidth=0.5,
    label="Histogram",
)
plt.legend()
plt.show()

## Kernel Regression

Context: we want to investigate the nonparametric regression relation $y_i = m(x_i) +
\epsilon_i$, where $y_i$ is a dependent variable, $x_i$ an explanatory variable, and
$\epsilon_i$ an iid error term, for observations $i = 1, ..., n$.

### Sample generation

In [None]:
def m(x):
    """True function."""
    return 3 * np.sin(x) + 2 * x

In [None]:
epsilon = np.random.default_rng(seed=28071995).normal(0, sigma / 2, size=n)
y = m(sample) + epsilon

In [None]:
def nw_estimator(x, y, h, sample, kernel_type):
    """Nadaraya - Watson / Local constant estimator."""
    k = np.vectorize(kernels[kernel_type])
    k0 = k((x - sample) / h)
    numerator = np.sum(k0 * y)
    denominator = np.sum(k0)
    return numerator / denominator

In [None]:
def ll_estimator(x, y, h, sample, kernel_type):
    """Local linear estimator. See Li & Racine, p. 81."""
    k = np.vectorize(kernels[kernel_type])
    k_0 = k((x - sample) / h)
    s_2 = np.sum(k_0 * (sample - x) ** 2)
    s_1 = np.sum(k_0 * (sample - x))
    w = k_0 * (s_2 - s_1 * (sample - x))
    numerator = np.sum(w * y)
    denominator = np.sum(w)
    return numerator / denominator

In [None]:
def ll_estimator2(x, y, h, sample, kernel_type):
    """Local linear estimator. See Li & Racine, p. 81."""
    k = np.vectorize(kernels[kernel_type])
    w = np.diag(k((sample - x) / h))
    z = np.array((np.ones(len(sample)), sample - x))
    return np.linalg.inv(z.dot(w).dot(z.transpose())).dot(z).dot(w).dot(y)

In [None]:
def loocv_error(h, y, sample, kernel_type):
    """Compute the LOOCV error for a given bandwidth."""
    error_nw = 0
    error_ll = 0
    estimator_nw = partial(nw_estimator, h=h, sample=sample, kernel_type=kernel_type)
    estimator_ll = partial(ll_estimator, h=h, sample=sample, kernel_type=kernel_type)

    # For each observation
    for i in range(len(sample)):
        # Create a new sample excluding the current observation
        sample_loo = np.delete(sample, i)
        y_loo = np.delete(y, i)

        # Update the sample and y in the estimator
        estimator_nw.keywords["sample"] = sample_loo
        estimator_nw.keywords["y"] = y_loo
        estimator_ll.keywords["sample"] = sample_loo
        estimator_ll.keywords["y"] = y_loo

        # Compute the prediction for the left-out observation
        prediction_nw = estimator_nw(x=sample[i])
        prediction_ll = estimator_ll(x=sample[i])

        # Add the squared error to the total error
        if not np.isnan(prediction_nw):
            error_nw += (y[i] - prediction_nw) ** 2
        if not np.isnan(prediction_ll):
            error_ll += (y[i] - prediction_ll) ** 2

    # Return the average error
    return error_nw / len(sample), error_ll / len(sample)


# List of bandwidths to consider
h_values = np.linspace(0.2, 2, 100)

# Compute the LOOCV error for each bandwidth
loocv_part = partial(loocv_error, sample=sample, y=y, kernel_type="Epanechnikov")
loocv_vec = np.vectorize(loocv_part)
errors = loocv_vec(h_values)

# Choose the bandwidth with the smallest error
loocv_h_nw, loocv_h_ll = h_values[np.argmin(errors, axis=1)]
f"Optimal bandwidth: {loocv_h_nw, loocv_h_ll}"

In [None]:
temp1 = partial(
    nw_estimator,
    y=y,
    h=loocv_h_nw,
    sample=sample,
    kernel_type="Epanechnikov",
)
temp2 = np.vectorize(temp1)
temp3 = partial(
    ll_estimator,
    y=y,
    h=loocv_h_ll,
    sample=sample,
    kernel_type="Epanechnikov",
)
temp4 = np.vectorize(temp3)

In [None]:
fig, ax = plt.subplots()
ax.plot(grid, m(grid), label="True relation")
ax.plot(grid, temp2(x=grid), label="Nadaraya-Watson estimator")
ax.plot(grid, temp4(x=grid), label="Local linear estimator")
plt.legend()
plt.show()

Apparently the Leave-One-Out Cross Validation algorithm gets stuck in a local optimum with very small bandwidths, at least for the Nadaraya-Watson estimator. I don't understand why, maybe because it can get perfect in sample fit if the observations are not so dense?

# Functional Data Analysis

Ideas for simulation
- Uni- vs. Multivariate case
- Simulate different normal distributions
- Vary parameters of (generalized) Beta distribution, so principal components can be interpreted as varying parameters

$X(t) = \sum_{k=1}^n η_k φ_k(t)$


## Transformation Method Paper (Petersen & Müller 2016)

In [None]:
# Equispaced grid on [0, 1]
gridnum = 100
grid_fda = np.linspace(start=-np.ones(n), stop=np.ones(n), num=gridnum)
grid_univ = np.linspace(start=-1, stop=1, num=gridnum)


# Define normal density
def norm_density(x, mu, sigma):
    """Define normal density function.

    To test: columns of x must align with mu and sigma.
    """
    x = np.array(x)  # to vectorize the input
    mu = np.array(mu)
    sigma = np.array(sigma)
    return np.reciprocal(np.sqrt(2 * np.pi) * sigma) * np.exp(
        (-0.5) * ((x - mu) / sigma) ** 2,
    )

In [None]:
# Draw different sigmas
log_sigmas = np.random.default_rng(seed=28071995).uniform(-1.5, 1.5, n)
mus = np.zeros(n)
sigmas = np.exp(log_sigmas)
densities_discretized = norm_density(grid_fda, mus, sigmas).transpose()
densities_discretized[0], sigmas[0]

In [None]:
# Do FPCA via package
fpca_discretized = FPCA(n_components=1)
fd = skfda.FDataGrid(densities_discretized)
fd.dataset_name = "Density Samples"
fd.argument_names = ("x-Value",)
fd.coordinate_names = ("Density",)
fpca_discretized.fit(fd)
fpca_discretized.components_.plot()

In [None]:
# Sample densities
partial_vectorized = np.vectorize(partial)
densities = partial_vectorized(norm_density, mu=mus, sigma=sigmas)

In [None]:
# Manual computation of FPCs
# Sample Mean:
def sample_mean(x: float, sample_funcs: np.ndarray) -> float:
    """Compute mean function."""
    sum_of_funcs = 0
    for f in sample_funcs:
        sum_of_funcs += f(x)
    return 1 / len(sample_funcs) * sum_of_funcs

In [None]:
def sample_cov_func(x: float, y: float, mean_func: callable, sample_funcs: np.ndarray):
    """Compute covariance function."""
    x = np.array(x)
    y = np.array(y)
    mean_x = mean_func(x, sample_funcs)
    mean_y = mean_func(y, sample_funcs)
    sum_cross_products = 0
    sum_x_evals = 0
    sum_y_evals = 0
    for f in sample_funcs:
        sum_cross_products += f(x) * f(y)
        sum_x_evals += f(x)
        sum_y_evals += f(y)
    return mean_x * mean_y + 1 / len(sample_funcs) * (
        sum_cross_products - mean_x * sum_y_evals - mean_y * sum_x_evals
    )

In [None]:
def sample_cov_func2(x: float, y: float, mean_func: callable, sample_funcs: np.ndarray):
    """Compute covariance function."""
    x = np.array(x)
    y = np.array(y)
    mean_x = mean_func(x, sample_funcs)
    mean_y = mean_func(y, sample_funcs)
    sum_products = 0
    for f in sample_funcs:
        sum_products += (f(x) - mean_x) * (f(y) - mean_y)
    return 1 / len(sample_funcs) * sum_products

In [None]:
# See whether they agree on the the result, they should
sample_cov_func(0, 0.5, sample_mean, densities), sample_cov_func2(
    0,
    0.5,
    sample_mean,
    densities,
)

In [None]:
%%timeit
approx_cov_func = np.zeros((gridnum, gridnum))
for i in range(gridnum):
    approx_cov_func[i] = sample_cov_func(
        grid_univ,
        grid_univ[i],
        sample_mean,
        densities,
    )

In [None]:
%%timeit
approx_cov_func2 = np.zeros((gridnum, gridnum))
for i in range(gridnum):
    approx_cov_func2[i] = sample_cov_func2(
        grid_univ,
        grid_univ[i],
        sample_mean,
        densities,
    )

In [None]:
%%timeit
approx_cov_np = np.cov(densities_discretized.transpose())

Numpy command obviously much faster, so won't use my own functions.

Now,

## Estimation of Eigenvectors

In [None]:
# Compute covariance matrix
approx_cov_np = np.cov(densities_discretized.transpose())

In [None]:
eig_vals, eig_vecs = np.linalg.eigh(approx_cov_np)
eig_vals2, eig_vecs2 = np.linalg.eigh(approx_cov_np)
eig_vals_sorted = eig_vals[np.argsort(-eig_vals)]
eig_vecs_sorted = eig_vecs[:, np.argsort(-eig_vals)]

In [None]:
diff = eig_vecs_sorted[:, 0] - eig_vecs[:, np.argmax(-eig_vals)]

$\int_0^1 f(t)dt$  wird dann durch die Riemann Summe $1/m \sum_{j=1}^m f(s_j)$ ersetzt ($s_j$  - Gridpunkte, $m$ -  Anzahl der Gridpunkte).

In [None]:
def riemann_sum(a, b, m, f, method="left"):
    """Compute integral."""
    stepsize = (b - a) / m
    if method == "left":
        grid = np.linspace(a, b - stepsize, m)
    elif method == "right":
        grid = np.linspace(a + stepsize, b, m)
    else:
        msg = "Must specify either left or right Riemann sum!"
        raise ValueError(msg)
    return sum(f(grid) * stepsize)

In [None]:
def riemann_sum2(vector):
    """Compute integral."""
    return sum(vector * 1 / len(vector))

In [None]:
riemann_sum(vector=eig_vecs_sorted[:, 0] ** 2)

In [None]:
# 1. Generate synthetic functional data
n_curves = 100
n_points = 100
x = np.linspace(0, 1, n_points)
# Generate curves with some random variation around a sine curve
data = np.array(
    [
        np.sin(4 * np.pi * x) + 0.5 * np.random.default_rng(_).normal(0, 0.5, len(x))
        for _ in range(n_curves)
    ],
)

# 2. Compute the mean function
mean_function = np.mean(data, axis=0)

# 3. Center the data
centered_data = data - mean_function

# 4. Estimate the covariance function using a discrete approximation
cov_matrix = np.cov(centered_data, rowvar=False)

# 5. Compute the eigenfunctions (principal components) of the covariance matrix
eigenvalues, eigenfunctions = np.linalg.eigh(cov_matrix)

# Sort eigenvalues and eigenfunctions in decreasing order
eigenvalues = eigenvalues[::-1]
eigenfunctions = eigenfunctions[:, ::-1]

# 6. Project the centered data onto the eigenfunctions
fpca_scores = np.dot(centered_data, eigenfunctions)

# Plot the mean function and the first two eigenfunctions
plt.figure(figsize=(12, 6))
plt.subplot(1, 3, 1)
plt.plot(x, mean_function, "b-")
plt.title("Mean Function")

plt.subplot(1, 3, 2)
plt.plot(x, eigenfunctions[:, 0], "r-")
plt.title("1st Eigenfunction")

plt.subplot(1, 3, 3)
plt.plot(x, eigenfunctions[:, 1], "g-")
plt.title("2nd Eigenfunction")

plt.tight_layout()
plt.show()

In [None]:
np.linalg.norm(eigenfunctions[:, 0])