# About
Estimation of distribution of dynamical and diffusional parameters of Ito's 
processes. This approach don't imply any functional representation of parameters
as in most of the know models.

## Brief overview

$S(t) = S_t$ is called an Ito process if it satisfies next equality:
$$ 
dS_t = a(t, S_t) \cdot dt + b(t, S_t) \cdot dW 
$$ (Ito_proc)


We can create estimate $a(t, S_t)$ and $b(t, S_t)$ in two ways:
1. By their distribution
2. Point-to-point


## 1. Estimation of Process distribution

Let $S(t_i) = S_i, \quad a(t_i, S_i) = a_i, \quad b(t_i, S_i) = b_i$ and
$ i \in \{1, ..., T\}$. Then
$$
\Delta S = S_i - S_{i-1} \approx a(t_i, S_i) \Delta t + b(t_i, S_i) \Delta W 
$$ (my_label)

$$
\Rightarrow
\mathbb{P}(\Delta S < x) \approx \mathbb{P}(\Delta W < \frac{x - a_i \Delta t}{b_i}),
\quad \text{where } \Delta W \sim \mathcal{N}(0,\Delta t)\,
$$

Here $\Delta t$ denotes time discretization step.

### Law of total probability

Let denote $\frac{x - a_i \Delta t}{b_i} = \xi(x, a_i, b_i, t)= \xi$. It is a r.v. 
Therefore $\mathbb{P}(\Delta W < \xi) \equiv \Phi(\frac{\xi - 0}{\Delta t})$

Keeping in mind that $a_i$ and $b_i$ are r.v. Without loss of generality 
let's suppose that $\forall t: \; a_i \in A \subset \mathbb{R} \wedge b_i \in B \subset \mathbb{R}$.

Let $f_{ab}$ denote joint distribution density for r.v. $a_i$ and $b_i$.
Then
$$
\mathbb{P}(\Delta W < \xi) = 
\int_{A}\int_{B} \, \mathbb{P}(\Delta W < \xi \: | \:a_i=y \wedge b_i=z) \cdot f_{ab}(y, z)\, dy \, dz =
\newline
\int_{A}\int_{B} \, \Phi(\frac{x - y \Delta t}{z \Delta t}) \cdot f_{ab}(y, z)\, dy \, dz =
\{\;f_{ab}(y_k, z_k) = p_k \;\} \approx
\sum_{k=1}^{K}{p_k \cdot \Phi \left( \frac{x - y_k \Delta t}{z_k \Delta t} \right)} 
$$

$$
\text{Small note: as a result we can get that} \quad
\mathbb{P}(dW < \xi) = 
\mathbb{E}_{a,b}(\Phi(\frac{x - a \cdot dt}{b \cdot dt})) 
$$

Therefore we can approximate initial process distribution as:

$$
\mathbb{P}(\Delta S < x) \approx
\sum_{k=1}^{K}{p_k \cdot \Phi \left( \frac{x - y_k \Delta t}{z_k \Delta t} \right)} 
$$

## Aspects to consider
1. How to choose limits for $A$ and $B$? Is it bad to consider them constant for
each $t$?
2. How to choose points $\{(y_k, z_k)\}_{k=1}^{K}$? Obviously the best approach 
is to select extremum of the joint distribution function $f_{ab}$.


In [None]:
from finito.simulator import generateGeneralWiener
import numpy as np
import scipy.stats as stats
import matplotlib.pyplot as plt
import seaborn as sns

# Generate process
dummyS = generateGeneralWiener(
    a=3, b=0.4, dt=np.timedelta64(1, "ms"), T=np.timedelta64(10, "s")
)
print("Shape of data:", dummyS.shape)

#### 1.1. EM-algorithm

In [None]:
# Take differences
deltaS = np.diff(dummyS)

# Take a look on result
f, ax = plt.subplots(3, 1, figsize=(10, 10))
ax[0].plot(dummyS)
ax[0].set_title("Original process")
ax[1].plot(deltaS)
ax[1].set_title("Differences")
sns.histplot(deltaS, kde=True, ax=ax[2])
ax[2].set_title("Distribution of differences")

plt.show()

In [None]:
from sklearn.mixture import GaussianMixture

gmm = GaussianMixture(
    n_components=1,
)

gmm.fit(deltaS.reshape(-1, 1))

print(
    "Model parameters after fitting:",
    f"Means: {gmm.means_[0]}",
    f"Covariances: {gmm.covariances_.flatten()}",
    sep="\n",
)
__time_delta = np.timedelta64(1, "s") / np.timedelta64(1, "ms")
print(
    "As we can see it differs from initial a and b parameters on time delta:",
    __time_delta,
)
print(
    f"Means: {gmm.means_ * __time_delta}",
    f"Covariances: {gmm.covariances_ * __time_delta}",
    f"Standard deviation: {np.sqrt(gmm.covariances_ * __time_delta)}",
    sep="\n",
)

### 1.2. Minimization problem on distribution

Discrete approximation $F_{\Delta S, T}(x)$ of the continuous distribution 
$F_{dS}(x)$ can be estimated in two ways: theoretical and empirical. 

On one hand:
$$
F^{(theor)}_{\Delta S, T}(x) = \mathbb{P}(\Delta S < x) \approx 
\sum_{k=1}^{K}{p_k \cdot \Phi \left( \frac{x - y_k \Delta t}{z_k \Delta t} \right)} 
\xrightarrow{K \rightarrow \infty , \,\Delta t \rightarrow 0} F_{dS}
$$

But on the other we can estimate empirical distribution of $\Delta S$ as:

$$
F^{(emp)}_{\Delta S, T}(x) = 
\frac{1}{T} \cdot \sum_{j=1}^{T} \mathbb{I} \left(\Delta S < x\right)
\xrightarrow{\Delta t \rightarrow 0} F_{dS}
$$

For evaluation of parameters $V_k = \{a_k, b_k, p_k\}$ we can set an 
minimization problem on distance (in some metric $\rho(f,g)$) between
theoretical and empirical distributions:

$$
\min_{\{V_k\}_{k=1}^{K}} \rho \left(F^{(emp)}_{\Delta S, T}, F^{(theor)}_{\Delta S, T}\right)
$$

#### 1.2.1. Empirical distribution implementation

In [None]:
# TODO: Optimize cycle. Currently O(M*N), can be O(N), where M is len(x), N is len(data)
from typing import Iterable


def EmpiricalCDF(data: Iterable, x: Iterable | None = None):
    """Estimate cumulative distribution function.

    Args:
        data (Iterable): Samples of the random variable
        x (Iterable | None): Specific points to calculate distribution in

    Returns:
        _type_: _description_
    """
    # Use all possible information for estimation
    if x is None:
        x = data
    res = []
    nSamples = len(data)
    for xi in x:
        res.append(np.sum(data <= xi) / nSamples)
    res.sort()
    return np.array(res)

In [None]:
# Represent empirical distribution function correspondence to theoretical
# distributions for a basic distributions
fig, ax = plt.subplots(1, 2, figsize=(10, 5))

# Binomial
ax[0].set_title("Binomial distribution")
ax[1].set_title("Normal distribution")
binomRV = stats.binom(n=10, p=0.5)
X = np.arange(-3, 13)
sns.pointplot(x=X, y=binomRV.cdf(X), ax=ax[0], color="blue")
nSamples = 240
binomSamples = binomRV.rvs(size=nSamples)
X = np.arange(min(binomSamples), max(binomSamples))
Xm = X[::1]  # To select discretization step
sns.pointplot(x=Xm, y=EmpiricalCDF(binomSamples, Xm), ax=ax[0], color="orange")
# Standard normal
nSamples = 200
normRV = stats.norm(loc=0, scale=1)
X = np.linspace(-3, 3, nSamples)
sns.pointplot(x=X, y=normRV.cdf(X), ax=ax[1], color="blue")
normSamples = normRV.rvs(size=nSamples)
Xm = X[::10]
sns.pointplot(x=Xm, y=EmpiricalCDF(normSamples, Xm), ax=ax[1], color="orange")
ax[1].tick_params(axis="x", which="both", labelsize=0)

#### 1.2.2 Gaussian mixture model implementation

In [None]:
class GaussianMixture:
    def _CheckGMParameters(a, b, p) -> None:
        assert a is not None and b is not None, "Means and variances must be provided"
        assert a.shape == b.shape == p.shape, "Parameters must have the same length"
        # Variances
        assert np.all(b > 0), "Variances must be positive"
        # Probabilities
        assert np.isclose(np.sum(p), 1.0), "Weights must sum to 1"
        assert np.all(p >= 0), "Weights must be non-negative"

    def __init__(self, a, b, p=None):
        """Initialize a Gaussian Mixture Model (GMM) with given parameters.

        Parameters:
            a (list[float]): means of the components
            b (list[float]): variances of the components
            p (list[float] | None): weights of the components (should sum to 1).
                If None, random weights will be generated.
        """
        a = np.asarray(a).reshape(-1, 1)  # (K, 1)
        b = np.asarray(b).reshape(-1, 1)  # (K, 1)
        if p is None:
            # Generate random weights that sum to 1
            p = np.random.dirichlet(np.ones(b.shape[0]))
        p = np.asarray(p).reshape(-1, 1)  # (K, 1)
        __class__._CheckGMParameters(a, b, p)
        self._means = a
        self._variances = b
        self._weights = p
        self.n_components = a.shape[0]
        self._gaussians = stats.norm(loc=self._means, scale=np.sqrt(self._variances))

    def cdf(self, x):
        """Compute the cumulative distribution function for the GMM at points x."""
        return np.sum(self._weights * self._gaussians.cdf(x), axis=0)

    def pdf(self, x):
        """Compute the probability density function for the GMM at points x."""
        return np.sum(self._weights * self._gaussians.pdf(x), axis=0)

    def sample(self, size, random_state=None):
        """Generate random samples from the GMM."""
        # Choose what gaussian to use for generation based on components weights
        component_indices = np.random.choice(
            self.n_components, size=size, p=self._weights.flatten()
        )
        # Generate samples from the chosen components
        samples = np.array(
            [
                self._gaussians.rvs(
                    size=(self.n_components, 1), random_state=random_state
                )[idx][0]
                for idx in component_indices
            ]
        )
        return samples

    def get_params(self):
        """Return the parameters of the GMM as a tuple (means, variances, weights)."""
        return self._means.flatten(), self._variances.flatten(), self._weights.flatten()

In [None]:
a = [-1, 3, 5]
b = [0.3, 0.5, 0.2]

gm = GaussianMixture(a=a, b=b)
r = gm.get_params()
np.stack(r)
gm.sample(10)

In [None]:
a = [-1, 3, 5]
b = [0.3, 0.5, 0.2]

gm1 = GaussianMixture(a=a, b=b)
x_grid = np.linspace(-3, 7, 100)
cdf_values = gm1.cdf(x_grid)
print(
    "Plots are different time to time due to random weights generation.",
    f"\nCurrent weights: {gm1._weights.flatten()}",
)

pdf_values = gm1.pdf(x_grid)

plt.figure(figsize=(8, 5))
plt.plot(x_grid, pdf_values, label="GMM PDF", color="blue")
plt.title("PDF of Gaussian Mixture Model")
plt.xlabel("x")
plt.ylabel("PDF")
plt.legend()
plt.grid()
plt.show()

sns.histplot(gm1.sample(size=1000), kde=True)

In [None]:
from sklearn.metrics import mean_absolute_error

samples = gm1.sample(100)
x = sorted(samples)
plt.figure(figsize=(8, 5))
origCDF = gm1.cdf(x)
plt.plot(x, origCDF, label="Original CDF", color="black")
empirCDF = EmpiricalCDF(samples)
plt.plot(x, empirCDF, label="Empirical CDF", color="green")
plt.title("CDF of Gaussian Mixture Model")
plt.xlabel("Random variable values")
plt.ylabel("Probability")
plt.legend()
plt.grid()
plt.show()

print(
    f"Number of samples: {len(samples)}",
    f"Mean absolute error: {mean_absolute_error(origCDF, empirCDF) * 100:.2f}%",
    sep="\n",
)

In [None]:
from scipy.optimize import minimize
from scipy.optimize import Bounds


def distance(params, x, dist=np.abs):
    mu, sigma, weight = params.reshape(3, 3, -1)
    # Evaluate KS statistic: max |F_n(t) - F(t; mu, sigma)|
    # We only need to check at data points (and possibly midpoints); a common simple approach:
    # F_n = np.arange(1, n+1) / n
    F_n = EmpiricalCDF(x, x)
    # Values t_i = x_sorted
    gaus = stats.norm(loc=mu, scale=sigma)
    F_theory = np.sum(weight * gaus.cdf(x), axis=0)
    # Evaluate KS statistic: max |F_n(t) - F(t; mu, sigma)|
    ks_vals = dist(F_n - F_theory)
    # Also check left limits just before each x_i (optional for more accuracy)
    # We skip for simplicity; KS is often evaluated at data points with this approach.
    return float(np.max(ks_vals))


def distF(x):
    return lambda params: distance(params, x)


initial_guess = [[0, 1, 2], [1, 1, 1], np.random.dirichlet(np.ones(3))]

initial_guess = np.array(initial_guess).flatten()


bounds = Bounds(
    [-10, -10, -10, 1e-3, 1e-3, 1e-3, 0, 0, 0], [10, 10, 10, 10, 10, 10, 1, 1, 1]
)

method = "nelder-mead"
x = gm1.sample(size=1000)
res = minimize(distF(x), initial_guess, method=method, bounds=bounds)

In [None]:
print(res.x.reshape(3, -1), gm1.get_params())

In [None]:
import numpy as np
from scipy.stats import norm
from scipy.optimize import minimize


def ecdf(x):
    # returns a function F_hat(t) that evaluates the empirical CDF at t
    x_sorted = np.sort(x)
    n = x.size

    def F_hat(t):
        # proportion of observations <= t
        return np.searchsorted(x_sorted, t, side="right") / n

    return F_hat


def ks_distance(params, x):
    mu, sigma = params
    if sigma <= 0:
        return np.inf
    # empirical CDF
    x_sorted = np.sort(x)
    # Evaluate KS statistic: max |F_n(t) - F(t; mu, sigma)|
    # We only need to check at data points (and possibly midpoints); a common simple approach:
    # F_n = np.arange(1, n+1) / n
    F_n = EmpiricalCDF(x_sorted, x_sorted)
    # Values t_i = x_sorted
    F_theory = norm.cdf(x_sorted, loc=mu, scale=sigma)
    ks_vals = np.abs(F_n - F_theory)
    # Also check left limits just before each x_i (optional for more accuracy)
    # We skip for simplicity; KS is often evaluated at data points with this approach.
    return float(np.max(ks_vals))


def cvm_distance(params, x):
    mu, sigma = params
    if sigma <= 0:
        return np.inf
    x_sorted = np.sort(x)
    n = x.size
    # CvM distance approximation using the Watson/CvM formula:
    # For ECDF F_n and theoretical F, one form is:
    # CvM = (1/n) * sum_{i=1}^n [F_n(x_(i)) - F(x_(i); mu,sigma))]^2
    # Here F_n(x_(i)) = i/n
    F_theory = norm.cdf(x_sorted, loc=mu, scale=sigma)
    CvM = np.mean((np.arange(1, n + 1) / n - F_theory) ** 2)
    return float(CvM)


def fit_normal_by_distance(x, distance_type="KS", initial=None, bounds=None):
    # initial guess for (mu, sigma)
    if initial is None:
        mu0 = np.mean(x)
        sigma0 = np.std(x, ddof=1)
        initial = np.array([mu0, max(sigma0, 1e-6)])
    if bounds is None:
        # sigma must be positive
        bounds = [(None, None), (1e-9, None)]
    if distance_type == "KS":

        def dist_func(p):
            return ks_distance(p, x)
    elif distance_type == "CvM" or distance_type == "CvM_distance":

        def dist_func(p):
            return cvm_distance(p, x)
    else:
        raise ValueError("Unsupported distance_type. Choose 'KS' or 'CvM'.")

    method = "nelder-mead"
    res = minimize(dist_func, initial, bounds=bounds, method=method)

    mu_hat, sigma_hat = res.x
    return {
        "mu_hat": float(mu_hat),
        "sigma_hat": float(sigma_hat),
        "distance": float(res.fun),
        "success": bool(res.success),
        "nit": int(res.nit) if hasattr(res, "nit") else None,
    }


# Example usage
if __name__ == "__main__":
    # generate some sample data
    seed = np.random.randint(0, 10000)
    rng = np.random.default_rng(seed)
    n = 200
    true_mu = 2.5
    true_sigma = 1.2
    x = rng.normal(loc=true_mu, scale=true_sigma, size=n)

    # Fit by KS distance
    res_ks = fit_normal_by_distance(x, distance_type="KS")
    print("KS fitting results:")
    print(res_ks)

    # Fit by CvM distance
    res_cvm = fit_normal_by_distance(x, distance_type="CvM")
    print("CvM fitting results:")
    print(res_cvm)