In [None]:
import pandas as pd
import numpy as np
from scipy.special import gamma, digamma, polygamma
from scipy.stats import kstest
from scipy.integrate import quad
from scipy import stats
import warnings
from sklearn.neighbors import KernelDensity, NearestNeighbors
from scipy.signal import find_peaks

warnings.filterwarnings("ignore", category=RuntimeWarning)

def safe_log(x, epsilon=1e-10):
    return np.log(np.maximum(x, epsilon))

def h(x, mu, sigma, r, b, epsilon=1e-10):
    b_safe = np.maximum(b, epsilon)
    sigma_safe = np.maximum(sigma, epsilon)
    term1 = b_safe / (gamma(1 / b_safe) * sigma_safe * 2 ** (1 + 1 / b_safe))
    sign_term = 1 + r * np.sign(x - mu)
    sign_term = np.maximum(sign_term, epsilon)
    b_safe = np.maximum(b_safe, epsilon)
    denominator = 2 * (sigma_safe ** b_safe) * sign_term ** b_safe
    abs_diff = np.abs(x - mu) ** b_safe
    term2 = np.exp(-abs_diff / np.maximum(denominator, epsilon))
    return term1 * term2

def weight(x, mu, sigma, r, b):
    epsilon = 1e-8
    max_value = 1e4
    base_abs = np.maximum(np.abs(x - mu), epsilon)
    base_sgn = np.maximum(np.abs(1 + r * np.sign(x - mu)), epsilon)
    base_abs = np.minimum(base_abs, max_value)
    base_sgn = np.minimum(base_sgn, max_value)
    weight_value = (base_abs ** (b - 2) + epsilon) / (base_sgn ** b + epsilon)
    weight_value = np.minimum(weight_value, max_value)
    return weight_value

def update_pi(q):
    return np.mean(q, axis=0)

def update_mu(data, q, mu, sigma, r, b):
    weights = np.zeros(len(q))
    result = 0
    result2 = 0
    for i in range(len(q)):
        weights[i] = weight(data[i], mu, sigma, r, b)
        result += q[i] * weights[i]
        result2 += q[i] * weights[i] * data[i]
    return result2 / result

def update_sigma(data, pi, q, mu, sigma, r, b):
    weights = np.zeros(len(q))
    result = 0
    for i in range(len(q)):
        weights[i] = weight(data[i], mu, sigma, r, b)
        result += q[i] * weights[i] * ((data[i] - mu) ** 2)
    denominator = 2 * len(q) * pi
    return (b * result / denominator) ** (1 / b)

def update_r(data, q, mu, r, b):
    epsilon = 1e-8
    numerator = 0
    denominator = 0
    for i in range(len(q)):
        pos_part = (data[i] >= mu).astype(float) * ((np.maximum(data[i] - mu, 0)) ** b)
        neg_part = (data[i] < mu).astype(float) * ((np.maximum(mu - data[i], 0)) ** b)
        numerator += q[i] * pos_part
        denominator += q[i] * neg_part
    if denominator < epsilon:
        return 0
    log_numerator = np.log(numerator + epsilon)
    log_denominator = np.log(denominator + epsilon)
    ratio = np.exp((log_numerator - log_denominator) / (b + 1))
    r_new = 1 - 2 / (ratio + 1)
    r_new = np.clip(r_new, -1, 1)
    return r_new

def f(x, mu, sigma, r, b, epsilon=1e-8):
    return np.abs(x - mu) / (sigma * (1 + r * np.sign(x - mu)) + epsilon)

def update_b(data, pi, q, mu, sigma, r, b, max_iter=10):
    term2 = 0
    term4 = 0
    for i in range(len(q)):
        f_value = f(data[i], mu, sigma, r, b)
        log_f_value = safe_log(f_value)
        f_value_clipped = np.clip(f_value, 1e-5, 1e5)
        log_f_value_clipped = np.clip(log_f_value, -1e5, 1e5)
        term2 += q[i] * (f_value_clipped ** b) * log_f_value_clipped
        term4 += q[i] * (f_value_clipped ** b) * log_f_value_clipped ** 2
    term1 = (1 / b + np.log(2) / (b ** 2) + digamma(1 / b) / (b ** 2))
    G = len(data) * pi * term1 - 0.5 * term2
    term3 = (1 / (b ** 2) + 2 * np.log(2) / (b ** 3) + 2 * digamma(1 / b) / (b ** 3) + polygamma(1, 1 / b) / (b ** 4))
    H = -len(data) * pi * term3 - 0.5 * term4
    b_new = b - G / H
    return b_new

def log_likelihood(data, q, pi, mu, sigma, r, b, epsilon=1e-10, max_value=1e5, max_exp=500):
    n, K = q.shape
    likelihood = 0
    for i in range(n):
        for j in range(K):
            ln_bj = safe_log(b[j], epsilon)
            ln_term = (1 + 1 / b[j]) * np.log(2)
            gamma_term = safe_log(gamma(1 / b[j]), epsilon)
            sigma_term = safe_log(sigma[j], epsilon)
            q_term = safe_log(np.maximum(q[i, j], epsilon), epsilon)
            abs_diff = (np.abs(data[i] - mu[j])) ** b[j]
            abs_diff = np.minimum(abs_diff, max_value)
            denominator = 2 * (sigma[j] ** b[j]) * ((1 + r[j] * np.sign(data[i] - mu[j])) ** b[j])
            denominator = np.minimum(denominator, max_value)
            term2 = np.exp(-np.minimum(abs_diff / np.maximum(denominator, epsilon), max_exp))
            likelihood += q[i, j] * (ln_bj - ln_term - gamma_term - sigma_term - q_term - term2)
    return likelihood

def em_algorithm(data, initial_params, max_iter=300, tol=1e-9, max_delta=0.1):
    pi, mu, sigma, r, b = initial_params
    K = len(pi)
    n = len(data)
    q = np.zeros((n, K))
    log_likelihood_prev = -np.inf
    for t in range(max_iter):
        for j in range(K):
            q[:, j] = pi[j] * h(data, mu[j], sigma[j], r[j], b[j])
        q_sum = q.sum(axis=1, keepdims=True)
        q_sum = np.maximum(q_sum, 1e-10)
        q /= q_sum
        pi_new = np.zeros(K)
        mu_new = np.zeros(K)
        sigma_new = np.zeros(K)
        r_new = np.zeros(K)
        b_new = np.zeros(K)
        for j in range(K):
            if K == 1:
                pi_new = [1.0]
            else:
                pi_new[j] = update_pi(q[:, j])
            mu_new[j] = update_mu(data, q[:, j], mu[j], sigma[j], r[j], b[j])
            sigma_new[j] = update_sigma(data, pi_new[j], q[:, j], mu_new[j], sigma[j], r[j], b[j])
            r_new[j] = update_r(data, q[:, j], mu_new[j], r[j], b[j])
            b_new[j] = update_b(data, pi_new[j], q[:, j], mu_new[j], sigma_new[j], r_new[j], b[j])
        pi = pi_new
        mu = mu_new
        sigma = sigma_new
        r = r_new
        b = b_new
        log_likelihood_curr = log_likelihood(data, q, pi, mu, sigma, r, b)
        if t > 0 and np.abs(log_likelihood_curr / log_likelihood_prev - 1) < tol:
            break
        log_likelihood_prev = log_likelihood_curr
    return pi, mu, sigma, r, b, log_likelihood_curr

def generalized_skew_normal_pdf(x, mu, sigma, b, r):
    normalization = b / (2 ** (1 + 1 / b) * gamma(1 / b) * sigma)
    h_val = np.abs(x - mu) / (sigma * (1 + r * np.sign(x - mu)))
    pdf = normalization * np.exp(-h_val ** b / 2)
    return pdf

def generalized_skew_normal_rvs(mu, sigma, b, r, size):
    samples = []
    while len(samples) < size:
        x_candidate = np.random.uniform(mu - 20 * sigma, mu + 20 * sigma)
        u = np.random.uniform(0, 1)
        if u < generalized_skew_normal_pdf(x_candidate, mu, sigma, b, r):
            samples.append(x_candidate)
    return np.array(samples)

def ks_test_mixture(data, pi, mu, sigma, b, r, size):
    n_components = len(b)
    samples = []
    for i in range(n_components):
        samples_i = generalized_skew_normal_rvs(mu[i], sigma[i], b[i], r[i], size=size)
        samples.append(samples_i)
    samples_mixed = []
    for i in range(n_components):
        num_samples = int(pi[i] * size)
        samples_mixed.append(samples[i][:num_samples])
    samples_mixed1 = np.concatenate(samples_mixed)
    ks_statistic, p_value = stats.ks_2samp(data, samples_mixed1)
    return ks_statistic, p_value

def sgn_log_likelihood(theta, x):
    mu, sigma, r, b = theta
    n = len(x)
    if sigma <= 0 or b <= 0:
        return -np.inf
    denominator = sigma * (1 + r * np.sign(x - mu))
    if np.any(denominator == 0):
        return -np.inf
    h_val = np.abs(x - mu) / denominator
    try:
        log_likelihood = (-n * ((1 + 1 / b) * np.log(2) + gamma(1 / b) + np.log(sigma) - np.log(b)) - np.sum(1 / 2 * h_val ** b))
    except ValueError:
        return -np.inf
    return log_likelihood

def detect_kde_peaks(samples_mixed, height=0.01, grid_n=1000):
    samples_mixed2 = np.array(samples_mixed).reshape(-1, 1)
    n_samples = len(samples_mixed2)
    std_dev = np.std(samples_mixed2)
    bandwidth_kde = 1.06 * std_dev * (n_samples ** (-1 / 5))
    kde = KernelDensity(bandwidth=bandwidth_kde).fit(samples_mixed2)
    x_vals = np.linspace(np.min(samples_mixed2), np.max(samples_mixed2), grid_n)
    log_density = kde.score_samples(x_vals.reshape(-1, 1))
    density = np.exp(log_density)
    peaks, props = find_peaks(density, height=height)
    peak_positions = x_vals[peaks]
    peak_heights = props.get("peak_heights", density[peaks])
    return samples_mixed2, peak_positions, peak_heights

def return_initialparams_for_K(samples_mixed, K_target, height=0.01):
    def half_range_mode_auto_h(data):
        data = np.sort(data)
        n = len(data)
        m = max(1, n // 2)
        data_range = np.max(data) - np.min(data)
        h_val = data_range / m
        max_count = 0
        mode_estimate = None
        for i in range(n):
            j = i
            while j < n and data[j] <= data[i] + h_val:
                j += 1
            count = j - i
            if count > max_count:
                max_count = count
                mode_estimate = (data[i] + data[j - 1]) / 2
        return mode_estimate, h_val

    samples_mixed2, peak_positions, peak_heights = detect_kde_peaks(samples_mixed, height=height)
    if len(peak_positions) == 0 or len(peak_positions) < K_target:
        peak_positions = np.array([np.mean(samples_mixed2)])
        K_target = 1
    order = np.argsort(-peak_heights)
    chosen = order[:K_target]
    peak_positions = peak_positions[chosen]
    nn = NearestNeighbors(n_neighbors=1)
    nn.fit(peak_positions.reshape(-1, 1))
    _, labels = nn.kneighbors(samples_mixed2)
    labels = labels.flatten()
    cluster_sizes = np.bincount(labels, minlength=K_target)
    total_points = len(samples_mixed2)
    pi_init = cluster_sizes / total_points
    pi_init = np.maximum(pi_init, 1e-12)
    pi_init = pi_init / np.sum(pi_init)
    mu_init = np.zeros(K_target, dtype=np.float64)
    sigma_init = np.zeros(K_target, dtype=np.float64)
    r_init = np.zeros(K_target, dtype=np.float64)
    b_init = np.zeros(K_target, dtype=np.float64)
    for k in range(K_target):
        cluster_points = samples_mixed2[labels == k].flatten()
        if len(cluster_points) == 0:
            mu_init[k] = float(np.mean(samples_mixed2))
            sigma_init[k] = float(np.std(samples_mixed2) + 1e-8)
            r_init[k] = 0.0
            b_init[k] = 2.0
            continue
        mode, _ = half_range_mode_auto_h(cluster_points)
        mu_init[k] = mode
        sigma_init[k] = np.sqrt(np.mean((cluster_points - mu_init[k]) ** 2))
        sigma_init[k] = max(sigma_init[k], 0.1)
        I = (cluster_points <= mu_init[k]).astype(int)
        r_init[k] = 1 - 2 * np.mean(I)
        b_values = np.linspace(0.1, 100, 10000)
        likelihoods = []
        for b_val in b_values:
            theta = (mu_init[k], sigma_init[k], r_init[k], b_val)
            likelihoods.append(sgn_log_likelihood(theta, cluster_points))
        b_init[k] = b_values[int(np.argmax(likelihoods))]
    return (pi_init, mu_init, sigma_init, r_init, b_init)

def fit_best_by_ks(data, samples_mixed, ks_size=2000, height=0.01, em_max_iter=300, em_tol=1e-9):
    _, peak_positions, _ = detect_kde_peaks(samples_mixed, height=height)
    k_peaks_raw = len(peak_positions)
    if k_peaks_raw <= 0:
        k_peaks_raw = 1
    results = []
    best = None
    for K in range(k_peaks_raw, 0, -1):
        initial_params = return_initialparams_for_K(samples_mixed, K_target=K, height=height)
        pi, mu, sigma, r, b, ll = em_algorithm(data, initial_params, max_iter=em_max_iter, tol=em_tol)
        ks_stat, p_value = ks_test_mixture(data, pi, mu, sigma, b, r, size=ks_size)
        rec = {"K": K, "p_value": p_value, "ks_stat": ks_stat, "pi": pi, "mu": mu, "sigma": sigma, "r": r, "b": b, "loglik": ll}
        results.append(rec)
        if best is None or p_value > best["p_value"]:
            best = rec
    for rec in results:
        print(f"K={rec['K']}, KS={rec['ks_stat']:.6g}, p-value={rec['p_value']:.6g}")
    print(f"\nBest: K={best['K']}, p-value={best['p_value']:.6g}, KS={best['ks_stat']:.6g}")
    return best, results, best["pi"], best["mu"], best["sigma"], best["r"], best["b"]
