In [None]:
from datetime import date
import random
import time
import yfinance as yf
import pandas as pd

import seaborn as sns

import matplotlib.pyplot as plt
import matplotlib.colors as mcolors

from numpy.fft import fft, ifft, fftshift
import numpy as np
from numpy import log, sqrt, exp


from sklearn.linear_model import Ridge
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, mean_absolute_percentage_error
from sklearn.preprocessing import MinMaxScaler
from sklearn.ensemble import RandomForestRegressor
from sklearn.mixture import GaussianMixture


from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.stats.diagnostic import acorr_ljungbox

import scipy.stats as stats
from scipy.stats import probplot, laplace, norm, t, poisson
from scipy.linalg import solve_banded
from scipy.optimize import minimize, differential_evolution
from scipy.integrate import quad
from scipy.special import roots_laguerre
from scipy.interpolate import interp1d
from scipy.sparse import diags, kron, identity, csr_matrix
from scipy.sparse.linalg import spsolve
from scipy.stats import multivariate_normal, kstest

import statsmodels.api as sm
from statsmodels.nonparametric.kde import KDEUnivariate
from statsmodels.tsa.stattools import adfuller, kpss
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.arima_process import ArmaProcess

import pymc as pm
import arviz as az

#import aesara.tensor as at

from tensorflow import keras
#from tensorflow.keras.utils import plot_model

#import pyswarms as ps

######################################
#from pmdarima import auto_arima
#from diptest import diptest



In [None]:
class pf_analysis:
    def __init__(self, pf, fit_type):
        self.pf = pf
        self.trading_days = 252
        self.fit_type = fit_type

    def check_plot(self):
        # Plot all columns
        ax = self.pf.plot(figsize=(14, 6), title="All Stock Time Series from portfolio")
        ax.legend_.remove()

        ax.set_xlabel("Date")
        ax.set_ylabel("Value")
        ax.grid(True)
        plt.tight_layout()
        plt.show()

    def CAGR(self, years=10):
        """
        Calculate CAGR (Compound Annual Growth Rate) over the past 'years' years
        for each column in the portfolio, ignoring NaNs. Returns sorted CAGR in %.
        """
        end_date = self.pf.index.max()
        start_date = end_date - pd.DateOffset(years=years)
        df_window = self.pf.loc[start_date:end_date]

        def _cagr(series):
            # Drop NaNs and convert to numeric
            series = pd.to_numeric(series, errors="coerce").dropna()
            if series.size < 2:
                return np.nan

            start_val = series.iloc[0]
            end_val = series.iloc[-1]
            start_date = series.index[0]
            end_date = series.index[-1]

            n_years = (end_date - start_date).days / 365.25
            if start_val <= 1e-10 or end_val <= 1e-10 or n_years < years:
                return np.nan

            return ((end_val / start_val) ** (1 / n_years) - 1) * 100  # CAGR in %

        cagr_series = df_window.apply(_cagr).dropna().sort_values()

        # Plot bar chart
        plt.figure(figsize=(14, 6))
        cagr_series.plot(kind='bar', color='skyblue', edgecolor='black')
        plt.title(f"{years}-Year CAGR per Stock")
        plt.ylabel("CAGR (%)")
        plt.xlabel("Stock")
        plt.grid(True, axis='y', linestyle='--', alpha=0.6)
        plt.tight_layout()
        plt.show()

    def log_return_val(self):
        log_returns = pd.DataFrame()

        for col in self.pf.columns:
            series = pd.to_numeric(self.pf[col], errors='coerce')
            first_valid = series[series > 0].first_valid_index()
            if first_valid is not None:
                trimmed = series.loc[first_valid:]
                log_ret = np.log(trimmed / trimmed.shift(1)).dropna()
                log_returns[col] = log_ret

        return log_returns

##################################################################################################################################################
    #FIT MODULES

    def fit_marginal_univariate(self, log_returns):
        data = log_returns.dropna()
        x = np.linspace(data.min(), data.max(), 1000)  # For PDF curve
        if self.fit_type == "norm":
                mu, std = stats.norm.fit(data)
                ks = stats.kstest(data, 'norm', args=(mu, std)).statistic
                mu_vector, std_vector = np.array(mu), np.array(std)
                pdf = stats.norm.pdf(x, mu, std)
                dist_label = f"Normal Fit\nμ={float(mu*self.trading_days):.4f}, σ={float(std*np.sqrt(self.trading_days)):.4f}\nKS={float(ks):.4f}"

        elif self.fit_type == "t":
                df_t, loc_t, scale_t = stats.t.fit(data)
                ks = stats.kstest(data, 't', args=(df_t, loc_t, scale_t)).statistic
                std = scale_t * np.sqrt(df_t / (df_t - 2)) if df_t > 2 else np.nan
                mu_vector, std_vector = np.array(loc_t), np.array(std)
                pdf = stats.t.pdf(x, df_t, loc_t, scale_t)
                dist_label = (f"t-Fit\nμ={float(loc_t*self.trading_days):.4f}, σ={float(std*np.sqrt(self.trading_days)):.4f}, "f"df={float(df_t):.2f}\nKS={float(ks):.4f}")
        else:
                raise ValueError("fit_type must be 'norm' or 't'")

        mu_annual = mu_vector * self.trading_days
        std_annual = std_vector * np.sqrt(self.trading_days)

        if self.fit_type == 'norm':
            args = mu_annual, std_annual
        elif self.fit_type == 't':
            args = mu_annual, std_annual, df_t

        # Plot histogram and fitted PDF
        plt.figure(figsize=(10, 5))
        plt.hist(data, bins=40, density=True, alpha=0.6, color='skyblue', label='Empirical')
        plt.plot(x, pdf, 'r-', lw=2, label='Fitted PDF')
        plt.title("Log Returns with Fitted Distribution")
        plt.xlabel("Log Return")
        plt.ylabel("Density")
        plt.legend(loc='upper right')
        plt.text(0.05, 0.95, dist_label, transform=plt.gca().transAxes,
                fontsize=10, verticalalignment='top', bbox=dict(boxstyle="round", facecolor='white', alpha=0.7))
        plt.grid(True)
        plt.tight_layout()
        plt.show()

        return args

    def fit_marginal_multivariate(self, log_returns):
        mu_vector = []
        std_vector = []
        ks_stats = {}

        for col in log_returns.columns:
            data = log_returns[col].dropna()

            if self.fit_type == "norm":
                mu, std = stats.norm.fit(data)
                ks = stats.kstest(data, 'norm', args=(mu, std)).statistic
                mu_vector.append(mu)
                std_vector.append(std)
                ks_stats[col] = ks

            elif self.fit_type == "t":
                df_t, loc_t, scale_t = stats.t.fit(data)
                ks = stats.kstest(data, 't', args=(df_t, loc_t, scale_t)).statistic
                mu_vector.append(loc_t)
                std = scale_t * np.sqrt(df_t / (df_t - 2)) if df_t > 2 else np.nan
                std_vector.append(std)
                ks_stats[col] = ks

            else:
                raise ValueError("fit_type must be 'norm' or 't'")

        # Convert to numpy arrays
        mu_vector = np.array(mu_vector)
        std_vector = np.array(std_vector)

        mu_annual = mu_vector * self.trading_days
        std_annual = std_vector * np.sqrt(self.trading_days)

        # Get sample Pearson correlation matrix
        corr_matrix = log_returns.corr().values

        # Construct covariance matrix: cov_ij = corr_ij * std_i * std_j
        cov_annual = np.outer(std_annual, std_annual) * corr_matrix

        if self.fit_type == 'norm':
            args = mu_annual, cov_annual
        elif self.fit_type == 't':
            args = mu_annual, cov_annual, df_t

        self.plot_multivariate_fit(list(log_returns.columns), list(ks_stats.values()), mu_annual, np.sqrt(np.diag(cov_annual)), corr_matrix)

        return args

    def fit_joint(self, log_returns):
        log_returns = log_returns.dropna()

        if self.fit_type == "norm":
            mu, cov = self.fit_multivariate_normal(log_returns)
            mu_annual, cov_annual = mu * self.trading_days, cov * self.trading_days
            ks_stat = self.multivariate_ks_stat(log_returns.values, (mu, cov))
            args = (mu_annual, cov_annual)

        elif self.fit_type == "t": ## Rosenblatt transform for CDFs since no closed form solution for multivariate t

            #df, mu, cov = self.fit_multivariate_t(log_returns) #log likelihood --> difficult to converge
            df, mu, cov = MultivariateT_EM().fit(log_returns) #---> EM algorithm

            mu_annual, cov_annual = mu * self.trading_days, cov * self.trading_days
            ks_stat = self.multivariate_ks_stat(log_returns.values, (mu, cov, df))
            args = (mu_annual, cov_annual, df)

        else:
            raise ValueError("fit_type must be 'norm' or 't'")

        ##correlation matrix
        std = np.sqrt(np.diag(cov_annual))
        log_corr = cov_annual / np.outer(std, std)
        self.plot_multivariate_fit(list(log_returns.columns), np.ones(len(mu_annual))*ks_stat, mu_annual, np.sqrt(np.diag(cov_annual)), log_corr )

        return args


    #############################################################
    def fit_multivariate_normal(self, log_returns):
        mu = log_returns.mean().values
        cov = log_returns.cov().values
        mvn = multivariate_normal(mean=mu, cov=cov)

        return mvn.mean, mvn.cov

    def fit_multivariate_t(self, log_returns):
        def neg_log_likelihood(params):
            df = params[0]
            if df <= 2:
                return np.inf
            d = log_returns.shape[1]
            mu = params[1:d+1]
            L = np.zeros((d, d))
            tril_indices = np.tril_indices(d)
            L[tril_indices] = params[d+1:]
            sigma = L @ L.T
            inv_sigma = np.linalg.inv(sigma)
            x = log_returns.values - mu
            term = (1 + (x @ inv_sigma * x).sum(axis=1) / df)
            return -np.sum(stats.t.logpdf(term, df=df, loc=0, scale=1))

        d = log_returns.shape[1]
        mu0 = log_returns.mean().values
        sigma0 = log_returns.cov().values
        L0 = np.linalg.cholesky(sigma0)
        x0 = np.concatenate([[5], mu0, L0[np.tril_indices(d)]])
        res = minimize(neg_log_likelihood, x0, method='L-BFGS-B')

        if res.success:
            df = res.x[0]
            mu = res.x[1:d+1]
            L = np.zeros((d, d))
            L[np.tril_indices(d)] = res.x[d+1:]
            cov = L @ L.T
            return df, mu, cov
        else:
            raise RuntimeError("Multivariate t fit did not converge")

    def multivariate_ks_stat(self, data, args):
        n, d = data.shape
        if self.fit_type == 'norm':
            mu, cov = args
            data_cdf = multivariate_normal(mean=mu, cov=cov).cdf(data)
        elif self.fit_type == 't':
            mu, cov, df = args
            # Transform to standard t (centered & whitened)
            L = np.linalg.cholesky(cov)
            z = np.linalg.solve(L, (data - mu).T).T
            u = stats.t.cdf(z, df=df)
            data_cdf = np.mean(u, axis=1)
        else:
            raise ValueError("Unsupported distribution type")

        ks_stat, _ = kstest(data_cdf, 'uniform')
        return ks_stat
#########################################################################################################################

    def plot_multivariate_fit(self, asset_names, ks_values, mu_annual, sigma_annual, log_corr):

        # Create subplots
        fig, axes = plt.subplots(3, 1, figsize=(14, 16))

        # --- Subplot 1: Annualized Mean ± 1 Std Dev ---
        axes[0].errorbar(asset_names, mu_annual*100, yerr=sigma_annual*100, fmt='o', capsize=5, color='dodgerblue', label="Annual Mean ± 1 Std")
        axes[0].set_title(f"Annualized Mean ± 1 Std Dev ({self.fit_type})")
        axes[0].set_ylabel("Annual Return")
        axes[0].grid(True)
        axes[0].legend()

        # --- Subplot 2: KS Statistic ---
        axes[1].bar(asset_names, ks_values, color='mediumpurple')
        axes[1].axhline(0.05, color='green', linestyle='--', label='Excellent Fit (<0.05)')
        axes[1].axhline(0.10, color='orange', linestyle='--', label='Acceptable Fit (<0.10)')
        axes[1].set_title(f"Marginal KS Statistics ({self.fit_type})")
        axes[1].set_ylabel("KS Statistic")
        axes[1].legend()
        axes[1].grid(True)

        # --- Subplot 3: Correlation Heatmap ---
        sns.heatmap(log_corr, ax=axes[2], cmap="coolwarm", annot=True, fmt=".2f", center=0,
                    xticklabels=asset_names, yticklabels=asset_names)
        axes[2].set_title("Pearson Correlation Matrix of Log Returns")

        plt.tight_layout()
        plt.show()

In [None]:
import numpy as np
from scipy.special import digamma, gammaln
from scipy.optimize import minimize_scalar
from numpy.linalg import inv, slogdet

class MultivariateT_EM:
    def __init__(self, max_iter=100, tol=1e-6, df_bounds=(2.01, 100)):
        self.max_iter = max_iter
        self.tol = tol
        self.df_bounds = df_bounds  # df must be > 2

    def _log_likelihood(self, X, mu, Sigma, df):
        n, d = X.shape
        inv_Sigma = inv(Sigma)
        x_mu = X - mu
        quad_form = np.sum(x_mu @ inv_Sigma * x_mu, axis=1)
        _, logdet = slogdet(Sigma)
        ll = (
            gammaln((df + d) / 2) - gammaln(df / 2)
            - 0.5 * logdet
            - (d / 2) * np.log(df * np.pi)
            - ((df + d) / 2) * np.log(1 + quad_form / df)
        )
        return np.sum(ll)

    def _optimize_df(self, w, d):
        def objective(df):
            if df <= 2:
                return np.inf
            term = -np.mean(np.log(w) - w)  # expectation term
            val = np.log(df / 2) - digamma(df / 2) + term + 1
            return val**2

        res = minimize_scalar(objective, bounds=self.df_bounds, method='bounded')
        return res.x

    def fit(self, data, df_init=10):
        X = np.asarray(data)
        n, d = X.shape
        mu = X.mean(axis=0)
        Sigma = np.cov(X.T)
        df = df_init

        for iteration in range(self.max_iter):
            inv_Sigma = inv(Sigma)
            x_mu = X - mu
            quad_form = np.sum(x_mu @ inv_Sigma * x_mu, axis=1)
            w = (df + d) / (df + quad_form)

            mu_new = np.sum(w[:, None] * X, axis=0) / np.sum(w)
            x_mu = X - mu_new
            Sigma_new = (x_mu.T * w) @ x_mu / n
            df_new = self._optimize_df(w, d)

            # Convergence
            if (
                np.linalg.norm(mu - mu_new) < self.tol
                and np.linalg.norm(Sigma - Sigma_new) < self.tol
                and abs(df - df_new) < self.tol
            ):
                break

            mu, Sigma, df = mu_new, Sigma_new, df_new

        self.mu_, self.cov_, self.df_ = mu, Sigma, df
        self.loglike_t = self._log_likelihood(X, mu, Sigma, df)
        return df, mu, Sigma

    def log_likelihood_normal(self, data):
        X = np.asarray(data)
        n, d = X.shape
        mu = X.mean(axis=0)
        Sigma = np.cov(X.T)
        x_mu = X - mu
        inv_Sigma = inv(Sigma)
        _, logdet = slogdet(Sigma)
        quad_form = np.sum(x_mu @ inv_Sigma * x_mu, axis=1)
        ll = -0.5 * (d * np.log(2 * np.pi) + logdet + quad_form)
        return np.sum(ll)
