In [1]:
from datetime import datetime
import pandas as pd
import ipywidgets as widgets
from ipydatagrid import DataGrid, TextRenderer
from IPython.display import display
from IPython.display import HTML

import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from RiskMetrics import RiskAnalysis,diversification_constraint, create_constraint
from Rebalancing import rebalanced_portfolio , buy_and_hold
from scipy.stats import norm, chi2,gumbel_l

In [2]:

def display_scrollable_df(df, max_height="50vh", max_width="90vw"):
    style = f"""
    <div style="
        display: flex;
        justify-content: center;
        padding: 20px;
    ">
        <div style="
            overflow: auto;
            max-height: {max_height};
            max-width: {max_width};
            width: 100%;
            border: 1px solid #444;
            padding: 10px;
            background-color: #000;
            color: #eee;
            font-family: 'Arial Narrow', Arial, sans-serif;
            box-sizing: border-box;
        ">
            {df.to_html(classes='table', border=0, index=True)}
        </div>
    </div>
    """
    return HTML(style)


In [3]:
def get_asset_returns(prices):
    
    ret=prices.iloc[-1]/prices.iloc[0]-1
    ytd=(1+ret)**(365/(prices.index[-1]-prices.index[0]).days)-1
    ret_ytd=prices.loc[datetime(max(prices.index.year), 1, 1):].iloc[-1]/prices.loc[datetime(max(prices.index.year),1,1):].iloc[0]-1

    perfs=pd.concat([ret,ret_ytd,ytd],axis=1)
    perfs.columns=['Returns since '+ pd.to_datetime(prices.index[0], format='%Y-%d-%m').strftime("%Y-%m-%d"),
              'Returns since '+datetime(max(prices.index.year), 1, 1).strftime("%Y-%m-%d"),
              'Annualized Returns']
    
    return perfs.T.round(4)

def get_asset_risk(prices):

    dates_drawdown=((prices-prices.cummax())/prices.cummax()).idxmin().dt.date
    
    vol=prices.pct_change().iloc[-260:].std()*np.sqrt(260)
    weekly_vol=prices.resample('W').last().iloc[-153:].pct_change().std()*np.sqrt(52)
    monthly_vol_1Y=prices.resample('ME').last().iloc[-50:].pct_change().std()*np.sqrt(12)
    monthly_vol_5Y=prices.resample('ME').last().iloc[-181:].pct_change().std()*np.sqrt(12)

    drawdown=pd.DataFrame((((prices-prices.cummax()))/prices.cummax()).min())
    Q=0.05
    intervals=np.arange(Q, 1, 0.0005, dtype=float)
    cvar=monthly_vol_5Y*norm(loc =0 , scale = 1).ppf(1-intervals).mean()/0.05

    risk=pd.concat([vol,weekly_vol,monthly_vol_1Y,monthly_vol_5Y,cvar,drawdown,dates_drawdown],axis=1).round(4)
    risk.columns=['Annualized Volatility (daily)',
    'Annualized Volatility 3Y (Weekly)',
    'Annualized Volatility 5Y (Monthly)','Annualized Volatility since 2020 (Monthly)',
    'CVar Parametric '+str(int((1-Q)*100))+'%','Max Drawdown','Date of Max Drawdown']
    
    
    return risk.T.round(4)

def get_expected_metrics(returns,dataframe):
    portfolio=RiskAnalysis(returns)
    allocation_dict={}

    for idx in dataframe.index:
        allocation_dict[idx]=dataframe.loc[idx].to_numpy()


    
    metrics={}
    metrics['Expected Returns']={}
    metrics['Expected Volatility']={}
    metrics['Sharpe Ratio']={}

    for key in allocation_dict:

        metrics['Expected Returns'][key]=(np.round(portfolio.performance(allocation_dict[key]), 4))
        metrics['Expected Volatility'][key]=(np.round(portfolio.variance(allocation_dict[key]), 4))
        sharpe_ratio=np.round(portfolio.performance(allocation_dict[key])/portfolio.variance(allocation_dict[key]),2)
        metrics['Sharpe Ratio'][key]=sharpe_ratio

    indicators = pd.DataFrame(metrics,index=allocation_dict.keys())

    return indicators.T.round(4)

def rebalanced_time_series(prices,dataframe,frequency='Monthly'):

    portfolio_returns=pd.DataFrame()

    for key in dataframe.index:
        portfolio_returns['Buy and Hold '+key]=buy_and_hold(prices, dataframe.loc[key]).sum(axis=1)
        portfolio_returns['Rebalanced '+key]=rebalanced_portfolio(prices, dataframe.loc[key],frequency=frequency).sum(axis=1)

    portfolio_returns.index.name='Date'
    
    return portfolio_returns

def rebalanced_metrics(portfolio_returns):

    ret=portfolio_returns.iloc[-1]/portfolio_returns.iloc[0]-1
    ytd=(1+ret)**(365/(portfolio_returns.index[-1]-portfolio_returns.index[0]).days)-1
    ret_ytd=portfolio_returns.loc[datetime(max(portfolio_returns.index.year),1,1):].iloc[-1]/portfolio_returns.loc[datetime(max(portfolio_returns.index.year),1,1):].iloc[0]-1

    perfs=pd.concat([ret,ret_ytd,ytd],axis=1)
    perfs.columns=['Returns since '+ pd.to_datetime(portfolio_returns.index[0], format='%Y-%d-%m').strftime("%Y-%m-%d"),
              'Returns since '+datetime(max(portfolio_returns.index.year), 1, 1).strftime("%Y-%m-%d"),
              'Annualized Returns']
    
    return perfs.T.round(4)


def get_portfolio_risk(dataframe,prices,portfolio_returns,benchmark):

    allocation_dict={}
    
    returns=prices.pct_change()
    
    for idx in dataframe.index:
        allocation_dict[idx]=dataframe.loc[idx].to_numpy()


    tracking_error_daily={}
    tracking_error_monthly={}
    monthly_returns=prices.resample('ME').last().iloc[-180:].pct_change()


    for key in allocation_dict:
        tracking_error_daily['Buy and Hold '+key]=RiskAnalysis(returns).variance(allocation_dict[key]-allocation_dict[benchmark])/np.sqrt(252)*np.sqrt(260)
        tracking_error_daily['Rebalanced '+key]=RiskAnalysis(returns).variance(allocation_dict[key]-allocation_dict[benchmark])/np.sqrt(252)*np.sqrt(260)
        tracking_error_monthly['Buy and Hold '+key]=RiskAnalysis(monthly_returns).variance(allocation_dict[key]-allocation_dict[benchmark])/np.sqrt(252)*np.sqrt(12)
        tracking_error_monthly['Rebalanced '+key]=RiskAnalysis(monthly_returns).variance(allocation_dict[key]-allocation_dict[benchmark])/np.sqrt(252)*np.sqrt(12)

    tracking_error_daily=pd.DataFrame(tracking_error_daily.values(),index=tracking_error_daily.keys(),columns=['Tracking Error (daily)'])
    tracking_error_monthly=pd.DataFrame(tracking_error_monthly.values(),index=tracking_error_monthly.keys(),columns=['Tracking Error (Monthly)'])

    dates_drawdown=((portfolio_returns-portfolio_returns.cummax())/portfolio_returns.cummax()).idxmin().dt.date
    
    vol=portfolio_returns.pct_change().iloc[:].std()*np.sqrt(260)
    monthly_vol=portfolio_returns.resample('ME').last().iloc[-50:].pct_change().std()*np.sqrt(12)

    drawdown=pd.DataFrame((((portfolio_returns-portfolio_returns.cummax()))/portfolio_returns.cummax()).min())
    Q=0.05
    intervals=np.arange(Q, 1, 0.0005, dtype=float)
    cvar=monthly_vol*norm(loc =0 , scale = 1).ppf(1-intervals).mean()/0.05

    risk=pd.concat([vol,tracking_error_daily,monthly_vol,tracking_error_monthly,cvar,drawdown,dates_drawdown],axis=1).round(4)
    risk.columns=['Annualized Volatility (daily)','TEV (daily)',
                  'Annualized Volatility (Monthly)','TEV (Monthly)',
                  'CVar Parametric '+str(int((1-Q)*100))+'%',
                  'Max Drawdown','Date of Max Drawdown']
    
    return risk.T.round(4)

In [4]:
def date_interval(prices_original):
    start_date = prices_original.index[0]
    end_date = prices_original.index[-1]
    
    dates = pd.date_range(start_date, end_date, freq='D')
    
    options = [(date.strftime(' %d %b %Y '), date) for date in dates]
    index = (0, len(options)-1)
    
    selection_range_slider = widgets.SelectionRangeSlider(
        options=options,
        index=index,
        description='Dates',
        orientation='horizontal',
        layout={'width': '500px'}
    )
    return selection_range_slider 

def update_price_window(prices_original):
    selection_range_slider = date_interval(prices_original)  # Ensure this function is correct and returns a date range slider
    output = widgets.Output()
    button = widgets.Button(description="Apply Date Range", button_style='primary')

    result_tuple = [None, None]  # (prices, returns)

    def on_button_click(b):
        with output:
            output.clear_output()  # Clear previous output
            selmind = selection_range_slider.value[0].date()
            selmaxd = selection_range_slider.value[1].date()

            # Ensure the selected dates are valid
            if selmind > selmaxd:
                print("Start date must be earlier than end date.")
                return
            
            # Filter the prices by the selected date range
            prices = prices_original.loc[selmind:selmaxd]
            returns = prices.pct_change().dropna()

            result_tuple[0] = prices
            result_tuple[1] = returns
    
    button.on_click(on_button_click)

    display(widgets.VBox([selection_range_slider, button, output]))

    return result_tuple

In [5]:
def build_constraint(prices_original, constraint_matrix):
    constraints = []
    dico_map = {'=': 'eq', '≥': 'ineq', '≤': 'ineq'}

    drop_down_list_asset = list(prices_original.columns) + ['All']
    drop_down_list_sector = list(transparency_table.index)
    drop_down_list = drop_down_list_asset + drop_down_list_sector + [None]

    try:
        for row in range(constraint_matrix.shape[0]):
            temp = constraint_matrix[row, :]
            ticker = temp[0]

            if ticker not in drop_down_list:
                continue

            sign = temp[1]
            limit = float(temp[2])

            if ticker == 'All':
                constraint = diversification_constraint(sign, limit)

            elif ticker in drop_down_list_asset:
                position = np.where(prices_original.columns == ticker)[0][0]
                constraint = create_constraint(sign, limit, position)

            elif ticker in drop_down_list_sector:
                position = np.where(transparency_table.index == ticker)[0][0]
                if sign == '≤':
                    constraint = [{
                        'type': dico_map[sign],
                        'fun': lambda weights, p=position, l=limit: l - (weights @ transparency_matrix)[p]
                    }]
                elif sign == '≥':
                    constraint = [{
                        'type': dico_map[sign],
                        'fun': lambda weights, p=position, l=limit: (weights @ transparency_matrix)[p] - l
                    }]
                else:  # '='
                    constraint = [{
                        'type': dico_map[sign],
                        'fun': lambda weights, p=position, l=limit: (weights @ transparency_matrix)[p] - l
                    }]

            constraints.extend(constraint)

    except Exception as e:
        print(f"Error in build_constraint: {e}")

    return constraints


In [6]:
def get_frontier(returns,dataframe):
    portfolio=RiskAnalysis(returns)
    frontier_weights, frontier_returns, frontier_risks, frontier_sharpe_ratio = portfolio.efficient_frontier()
    
    weight_matrix={}

    for idx in dataframe.index:
        
        weight_matrix[idx]=dataframe.loc[idx].to_numpy()
    
    metrics = {
        'Returns': {},
        'Volatility': {},
        'Sharpe Ratio': {}
    }
    for key in weight_matrix:
        
        metrics['Returns'][key]=(np.round(portfolio.performance(weight_matrix[key]), 4))
        metrics['Volatility'][key]=(np.round(portfolio.variance(weight_matrix[key]), 4))
        metrics['Sharpe Ratio'][key]=np.round(metrics['Returns'][key]/metrics['Volatility'][key],4)
    
    
    frontier = pd.DataFrame(
        {
            "Returns": frontier_returns,
            "Volatility": frontier_risks,
            "Sharpe Ratio": frontier_sharpe_ratio,
        }
    )
    
    fig = px.scatter(
        frontier,
        y="Returns",
        x="Volatility",
        color="Sharpe Ratio",
        color_continuous_scale='blues',
    )
    
    for key in weight_matrix:
    
        fig.add_scatter(
            x=[metrics["Volatility"][key]],
            y=[metrics["Returns"][key]],
            mode="markers",
            marker=dict(color="orange", size=8, symbol="x"),
            name=key,
        )
        
        
    fig.add_scatter(
        x=[metrics["Volatility"]['Optimal Portfolio']],
        y=[metrics["Returns"]['Optimal Portfolio']],
        mode="markers",
        marker=dict(color="red", size=8, symbol="x"),
        name='Optimal Portfolio',
    )
    
    fig.update_layout(
        showlegend=False, 
        hoverlabel_namelength=-1,
        font=dict(
            family="Arial Narrow",
            size=14,
            color="white" 
        ),
        plot_bgcolor="black", 
        paper_bgcolor="black"  
    )
    
    fig.update_layout(showlegend=False)
    fig.update_layout(hoverlabel_namelength=-1)
    indicators = pd.DataFrame(metrics,index=weight_matrix.keys()).T

    return indicators,fig
    
def get_correlation_matrix(returns):
    fig = px.imshow(returns.corr().round(2), color_continuous_scale='blues', text_auto=True, aspect="auto")
    fig.update_layout(plot_bgcolor="black", paper_bgcolor="black", font_color="white")  
    fig.update_traces(xgap=2, ygap=2)
    fig.update_traces(textfont=dict(family="Arial Narrow", size=15))
    fig.show()

In [28]:
def display_app(prices_original):

    prices = prices_original
    returns = prices_original.pct_change()

    # Asset Returns and Risk
    asset_returns = display_scrollable_df(get_asset_returns(prices))
    asset_risk = display_scrollable_df(get_asset_risk(prices))

    output_returns = widgets.Output()
    with output_returns:
        display(asset_returns)
        display(asset_risk)

    portfolio = RiskAnalysis(returns)

    output = widgets.Output()
    portfolio_rebalanced_output = widgets.Output()

    # Dropdowns for assets and constraints
    drop_down_list_asset = list(prices.columns) + ['All']
    drop_down_list_sector = list(transparency_table.index)
    drop_down_list = drop_down_list_asset + drop_down_list_sector + [None]
    constraints_options = ["=", "≥", "≤"]

    dropdown1 = widgets.Dropdown(description='Assets:', value=None, options=drop_down_list)
    dropdown2 = widgets.Dropdown(description='Sign:', options=constraints_options)
    dropdown3 = widgets.FloatText(description='Limit')

    data = []  # Constraint rows

    # Define the grid for the allocations
    grid = DataGrid(pd.DataFrame(), editable=True, layout={"height": "250px"})
    grid_button_output = widgets.Output()
    extra_controls_box = widgets.Output()

    frequency = widgets.Dropdown(
        options=['Monthly', 'Quarterly', 'Yearly'],
        value='Quarterly',
        description='Frequency:',
        disabled=False
    )

    benchmark = widgets.Dropdown(
        options=[],
        value=None,
        description='Benchmark:',
        disabled=False
    )

    def update_extra_controls(allocation_df):
        benchmark.options = allocation_df.index.tolist()
        benchmark.value = allocation_df.index[0] if not allocation_df.empty else None
        with extra_controls_box:
            extra_controls_box.clear_output()

    plot_output = widgets.Output()

    def on_add_constraint_clicked(b):
        # Add the new constraint to the list (data)
        row = {
            'Asset': dropdown1.value,
            'Sign': dropdown2.value,
            'Limit': dropdown3.value
        }
        data.append(row)  # Add constraint to the list
    
        with output:
            output.clear_output()
            display(pd.DataFrame(data))  # Display accumulated constraints
    
    # Button for adding constraints
    add_constraint_btn = widgets.Button(description='Add Constraint', button_style='success')
    add_constraint_btn.on_click(on_add_constraint_clicked)
    
    def on_optimize_clicked(b):
        # if not data:

        #     # with output:
        #     #     output.clear_output()
        #     #     print("No constraints to apply.")
        #     # return
    
        constraint_df = pd.DataFrame(data)    
        constraints = build_constraint(prices, constraint_df.to_numpy())
        
        # Run optimization without constraints
        optimized_weights = portfolio.optimize(objective="sharpe_ratio")
        
        # Run optimization with constraints
        optimized_weights_constraint = portfolio.optimize(objective="sharpe_ratio", constraints=constraints)
    
        # Create allocation DataFrame
        allocation = {
            'Optimal Portfolio': optimized_weights.tolist(),
            'Optimal Constrained Portfolio': optimized_weights_constraint.tolist()
        }
    
        allocation_dataframe = pd.DataFrame(allocation, index=returns.columns).T.round(4)
        grid.data = allocation_dataframe
    
        constraint_container = {'constraints': constraints, 'allocation_df': allocation_dataframe}
    
        update_extra_controls(allocation_dataframe)

    optimize_btn = widgets.Button(description='Optimize Portfolio', button_style='primary')
    optimize_btn.on_click(on_optimize_clicked)

    def on_clear_clicked(b):
        data.clear() 
        grid.data = pd.DataFrame() 

        with output:
            output.clear_output()
            display(display_scrollable_df(pd.DataFrame(columns=['Asset', 'Sign', 'Limit']))) 

        with extra_controls_box:
            extra_controls_box.clear_output()

    clear_btn = widgets.Button(description='Clear All', button_style='danger')
    clear_btn.on_click(on_clear_clicked)

    def on_add_click(b):
        if grid.data is None or grid.data.empty:
            return
        new_row = np.zeros(prices.shape[1])
        label = f"Allocation {grid.data.shape[0]-1}"
        new_df = pd.DataFrame([new_row], columns=grid.data.columns, index=[label])
        updated_df = pd.concat([pd.DataFrame(grid.data), new_df])
        grid.data = updated_df
        update_extra_controls(updated_df)

    def clear_allocation(b):
        if constraint_container.get('allocation_df') is not None:
            grid.data = constraint_container['allocation_df']
            update_extra_controls(constraint_container['allocation_df'])

    button_add = widgets.Button(description="Add Allocation")    
    button_clear = widgets.Button(description="Clear Allocation")
    button_add.on_click(on_add_click)
    button_clear.on_click(clear_allocation)

    def on_plot_click(b):
        with plot_output:
            plot_output.clear_output()
            if grid.data is not None and not grid.data.empty:
                portfolio_returns = rebalanced_time_series(prices, grid.data, frequency=frequency.value)
                ptf_drawdown=pd.DataFrame((((portfolio_returns-portfolio_returns.cummax()))/portfolio_returns.cummax()))

                fig = px.line(portfolio_returns, title="Portfolio Returns")
                fig.update_layout(plot_bgcolor="black", paper_bgcolor="black", font_color="white") 
                fig.update_traces(textfont=dict(family="Arial Narrow"))

                fig2=px.line(ptf_drawdown, title="Portfolio Drawdown",color_discrete_sequence = px.colors.sequential.Sunsetdark,render_mode='svg')
                fig2.update_layout(plot_bgcolor="black", paper_bgcolor="black", font_color="white") 
                fig2.update_traces(textfont=dict(family="Arial Narrow"))

                portfolio_rebalanced_returns = rebalanced_metrics(portfolio_returns)
                expected_metrics = get_expected_metrics(returns, grid.data)
                portfolio_risk = get_portfolio_risk(grid.data, prices, portfolio_returns, benchmark.value)
                transparency_exposure=(transparency_table@grid.data.T).round(4)
                with portfolio_rebalanced_output:
                    portfolio_rebalanced_output.clear_output()
                    display(display_scrollable_df(portfolio_rebalanced_returns))

                    display(display_scrollable_df(expected_metrics))
                    display(display_scrollable_df(portfolio_risk))
                    fig.show()
                    fig2.show()
                    display(display_scrollable_df(portfolio_returns))

                    display(display_scrollable_df(transparency_exposure))
            else:
                print("No allocation data to plot.")

    plot_btn = widgets.Button(description="Plot Returns")
    plot_btn.on_click(on_plot_click)

    def on_plot_click_frontier(b):
        if grid.data is None or grid.data.empty:
            return "No allocation defined"
            
        with frontier_output:
            frontier_output.clear_output()
            indicators, fig = get_frontier(returns, grid.data)
            fig.show()
            display(display_scrollable_df(indicators))
            get_correlation_matrix(returns)

    plot_btn_frontier = widgets.Button(description="Plot Frontier")
    frontier_output = widgets.Output()
    plot_btn_frontier.on_click(on_plot_click_frontier)

        # UI Construction
    
    aligned_controls = widgets.HBox([
        frequency,
        benchmark
    ], layout=widgets.Layout(
        display='flex',
        justify_content='center', 
        align_items='center',     
        spacing='10px',            
        width='auto'                
    ))
    
    aligned_buttons = widgets.HBox([
    plot_btn,  
    plot_btn_frontier  
], layout=widgets.Layout(
    display='flex',
    justify_content='center',  
    align_items='center',      
    spacing='10px',           
    width='auto'               
))

    aligned_controls_with_buttons = widgets.VBox([
        aligned_controls,       
        aligned_buttons        
    ], layout=widgets.Layout(
        display='flex',
        flex_direction='column', 
        justify_content='center', 
        align_items='center',     
        width='100%'           
    ))
    
    centered_controls_ui = widgets.Box(
        [aligned_controls_with_buttons],
        layout=widgets.Layout(
            display='flex',
            justify_content='center',
            align_items='center',   
            width='100%',           
            padding='10px'          
        ))

    
    constraint_ui = widgets.VBox([
        widgets.VBox([dropdown1, dropdown2, dropdown3]),
        widgets.HBox([add_constraint_btn, clear_btn, optimize_btn]),
        output
    ])
    
    centered_constraint_ui = widgets.Box(
        [constraint_ui],
        layout=widgets.Layout(
            display='flex',
            justify_content='center',
            align_items='center',
            width='auto',
            padding='10px'
        )
    )
    
    grid_ui = widgets.VBox([
        grid,
        widgets.HBox([button_add, button_clear]),
        grid_button_output
    ])
    
    grid.layout = widgets.Layout(
        width='auto',              
        height='auto',             
        min_height='100px',        
    )
        
    centered_grid_ui = widgets.Box(
        [grid_ui],
        layout=widgets.Layout(
            display='Block',  
            justify_content='center',  
            align_items='center',
            width='auto',
            overflow='visible',
            padding='10px'
        )
    )

    display(widgets.HTML('''
        <style>
            .widget-output, .widget-box {
                overflow: visible !important;
                max-height: none !important;
            }
        </style>
    '''))
    display(widgets.VBox([
        output_returns,
        centered_constraint_ui,
        centered_grid_ui,
        centered_controls_ui,  
        extra_controls_box,
        portfolio_rebalanced_output,
        widgets.VBox([plot_output]), 
        widgets.VBox([frontier_output]) 
    ]))

In [30]:
file=pd.ExcelFile('Indices.xlsx')
prices_original=file.parse(file.sheet_names[0],index_col=0)
prices_original=(1+prices_original).cumprod()
transparency_table=file.parse(file.sheet_names[1],index_col=0)
transparency_table=transparency_table[prices_original.columns]
transparency_matrix=(transparency_table.T/100).to_numpy()

# transparency_table.T

In [32]:
data = display_app(prices_original)

HTML(value='\n        <style>\n            .widget-output, .widget-box {\n                overflow: visible !i…

VBox(children=(Output(), Box(children=(VBox(children=(VBox(children=(Dropdown(description='Assets:', options=(…