In [None]:
import pandas as pd
import numpy as np
from qi4m.db import *
from scipy.sparse import csr_matrix
from scipy.sparse.csgraph import minimum_spanning_tree
from scipy.sparse.csgraph import dijkstra
import plotly.express as px
import plotly.io as pio
pio.renderers.default = 'iframe'





def download_returns_mantegna(isins:tuple, start_date:str, end_date:str, pivoted:bool = True)->pd.DataFrame:
    """
    Download returns given a list of ISINs between start_date and end_date
    
    Args:
        isins: tuple with ISINs for query
        start_date: starting date
        end_date: ending date
        pivoted: if True the table is pivoted to have dates as index and isins in columns
    Return:
        df with returns, pivoted if pivoted=True
    """
    
    rets = read_sql(f"""
        SELECT PRICEDATE, ISIN, TOTALRETURN_USD, COUNTRY, SECTOR, COMPANYID, SECURITYID, TRADINGITEMID, TICKERSYMBOL, COMPANYNAME
        FROM CLARIFI_DB.PRICES.SUBSET_PRICES
        WHERE
        PRICEDATE >= '{start_date}'
        AND PRICEDATE <= '{end_date}'
        AND ISIN IN {isins}
        """)

    rets.columns = [col.lower() for col in rets.columns]
    rets['totalreturn_usd'] = rets['totalreturn_usd'].astype(float)
    rets.drop_duplicates(subset=['pricedate', 'isin'], inplace=True)
    rets = rets[rets['pricedate'].dt.dayofweek < 5].reset_index(drop=True)
    
    if pivoted == True:
        pivoted_rets = rets.pivot(index='pricedate', columns='isin', values='totalreturn_usd')
        return pivoted_rets
    else:
        return rets





def distance_matrix_from_correlations(df:pd.DataFrame)->pd.DataFrame:
    """
    Compute distance matrix from correlation matrix as in Mantegna (1999) - Hierarchical structure in financial markets.
    
    Args:
        df: pd.DataFrame with stocks returns. Each column is a stock and index are dates
        
    Return:
        distance_matrix: pd.DataFrame with distances
    """
    
    distance_matrix = (2*(1 - df.corr()))**0.5
    distance_matrix = distance_matrix.dropna(axis = 0, how='all').dropna(axis = 1, how='all')
    
    return distance_matrix





def find_minimum_spanning_tree(distance_matrix:pd.DataFrame)->pd.DataFrame:
    """
    Compute Minimum Spanning Tree matrix from distance matrix. The matrix is a strictly upper triangular matrix.
    
    Args:
        distance_matrix: pd.DataFrame distance matrix
    Return:
        mst: pd.DataFrame for Minimum Spanning Tree
    """

    mst = pd.DataFrame(
        minimum_spanning_tree(
            csr_matrix(distance_matrix.mask(np.tril(np.ones(distance_matrix.shape).astype(bool))).values)
        ).toarray(), index=distance_matrix.index, columns=distance_matrix.columns)
    
    return mst





def generate_predecessors_matrix(mst:pd.DataFrame)->pd.DataFrame:
    """
    Using Dijkstra's Algorithm to find shortest path between stocks in a Minimum Spanning Tree, generate the matrix of predecessors.
    
    Args:
        mst: pd.DataFrame for Minimum Spanning Tree
    Return:
        predecessors: pd.DataFrame for matrix of predecessors
    """

    sum_of_distance_along_mst, predecessors = dijkstra(
        csgraph=csr_matrix(mst.values),    # Calculate distace on MST
        directed=False,    # The MST is undirected, hence must be False
        return_predecessors=True
        )
    predecessors = pd.DataFrame(predecessors, index=mst.index, columns=mst.columns)
    predecessors = predecessors.replace(dict(zip([i for i in range(len(predecessors.columns))], predecessors.columns)))
    
    return predecessors





def find_path_between_stocks(go_from:str, go_to:str, predecessors:pd.DataFrame)->list:
    """
    Find the shortest path on the Minimum Spanning Tree given a matrix of predecessors
    
    Args:
        go_from: starting point
        go_to: ending point
        predecessors: pd.DataFrame for matrix of predecessors
    Return:
        steps: list of steps to go from starting point to ending point
    """

    path = []
    path.append(go_to)
    if predecessors.loc[go_from, go_to] == -9999:
        raise Exception(f"Trying to go from {go_from} to {go_to}. You can't have have a starting point equal to destination")

    while go_from != predecessors.loc[go_from, go_to]:
        go_to = predecessors.loc[go_from, go_to]
        path.append(go_to)

    path.append(go_from)
    path.reverse()

    steps = []
    for i in range(1, len(path)):
        steps.append((path[i-1], path[i]))
        
    return steps





def ultrametric_distance_matrix(distance_matrix:pd.DataFrame, predecessors:pd.DataFrame)->pd.DataFrame:
    """
    Compute Ultrametric Distance matrix from a Distance matrix as in Mantegna (1999) - Hierarchical structure in financial markets.
    
    Args:
        distance_matrix: pd.DataFrame distance matrix
        predecessors: pd.DataFrame for matrix of predecessors
    Return:
        ultra_dist: pd.DataFrame ultrametric distance matrix
    """

    ultra_dist = pd.DataFrame(np.full(distance_matrix.shape, np.nan), index=distance_matrix.index, columns=distance_matrix.columns)
    for i in distance_matrix.index:
        for j in distance_matrix.columns:
            if i==j:
                continue
            temp_steps = find_path_between_stocks(i, j, predecessors)

            steps_length = []
            for k in temp_steps:
                steps_length.append(distance_matrix.loc[k[0], k[1]])

            ultra_dist.loc[i, j] = max(steps_length)
            
    np.fill_diagonal(ultra_dist.values, 0)
    ultra_dist.columns.name=None
    ultra_dist.index.name=None
    
    return ultra_dist





def mantegna_hierarchical_structure(pivot_returns:pd.DataFrame)->tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame, dict]:
    """
    Implement the pipeline for the hierarchical arrangement of stocks using the subdominant ultrametric distance as in Mantegna (1999) - Hierarchical
    structure in financial markets.
    
    Args:
        pivot_returns: df with pivoted returns and date as index. Each column is a stock
    Return:
        dist: pd.DataFrame with distances
        mst: pd.DataFrame for Minimum Spanning Tree
        predecessors: pd.DataFrame for matrix of predecessors
        ultrametric_dist_matrix: matrix of ultrametic distance 
        group_of_stocks: dict with stocks grouped pairwise by deciles of ultrametic distance
    """
    
    # Compute distance matrix from correlations
    dist = distance_matrix_from_correlations(pivot_returns)
    print('Distance Matrix done')
    
    # Find Minumum Spanning Tree
    mst = find_minimum_spanning_tree(dist)
    print('MST done')
    
    # Find matrix of predecessors in order to find the shortest path from one stock to another along the Minumum Spanning Tree
    predecessors = generate_predecessors_matrix(mst)
    print('Predecessors done')
    
    # Compute subdominant ultrametric distance matrix from predecessors
    ultra_dist = ultrametric_distance_matrix(dist, predecessors)
    ultrametric_dist_matrix = ultra_dist.copy()
    print('Ultrametric Distance Matrix done')
    
    group_of_stocks = group_by_percentiles(ultra_dist)
    print('Grouping by Percentiles done')

    return dist, mst, predecessors, ultrametric_dist_matrix, group_of_stocks





def group_by_percentiles(ultra_dist):
    
    ultra_dist = ultra_dist.mask(np.tril(np.ones(ultra_dist.shape).astype(bool))).unstack().reset_index().dropna().reset_index(drop=True)
    ultra_dist = ultra_dist.set_index(['level_0', 'level_1']).rename({0:'ultra_dist'}, axis=1)

    group_of_stocks={}
    for i,j in zip([0, 1, 2, 3, 4, 5, 10, 20], [1, 2, 3, 4, 5, 10, 20, 50]):
        group_of_stocks[f'ultradist_pctl_{i}_{j}'] = ultra_dist[(ultra_dist['ultra_dist'] >= np.percentile(ultra_dist['ultra_dist'].values, i)) &
               (ultra_dist['ultra_dist'] < np.percentile(ultra_dist['ultra_dist'].values, j))].index.to_list()
    
    group_of_stocks[f'ultradist_pctl_above{j}'] = ultra_dist[ultra_dist['ultra_dist'] >= np.percentile(ultra_dist['ultra_dist'].values, j)].index.to_list()
    
    return group_of_stocks