In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

In [3]:
# DEVELOP OUR OWN FUNCTION OF CALCULATING ANNUALIZED RETURN
# Make sure that the returns are in decimal formats
def r_ann(r, periods_per_year):
    obs = r.shape[0]
    hpr = (1 + r).prod()            # print this if we need to display HPR
    r_compound = hpr**(1/obs) - 1   # print this if we need to display monthly compound return
    return (1 + r_compound) ** 12 - 1

In [5]:
# DEVELOP OUR OWN FUNCTION OF CALCULATING ANNUALIZED VOLATILITY
def std_ann(r, periods_per_year):
    sd = r.std()                    # print this if we need to display monthly standard deviation
    return sd * np.sqrt(12)

In [7]:
# DEVELOP OUR OWN FUNCTION OF CALCULATING THE SHARPE RATIO
def sharpe_ratio(r, risk_free, periods_per_year):
    r_annualized = r_ann(r = r, periods_per_year = periods_per_year)
    sd_annualized = std_ann(r = r, periods_per_year = periods_per_year)
    r_excess =  r_annualized - risk_free
    return r_excess / sd_annualized

In [9]:
# DEVELOP OUR OWN FUNCTION OF CALCULATING MAX DRAWDOWN
def drawdown(return_series: pd.Series):
    wealth_index = 1000*(1 + return_series).cumprod()
    previous_peak = wealth_index.cummax()
    drawdown =  (wealth_index - previous_peak)/previous_peak
    df = pd.DataFrame({
    'Wealth' : wealth_index,
    'Peaks' : previous_peak,
    'Drawdowns' : drawdown })
    return df

In [11]:
# DEVELOP THE JARQUE BERA TEST FOR NORMALITY
# For this test we set at 1% by default
def is_normal(r, level = 0.01):
    statistic, p_value = scipy.stats.jarque_bera(r)
    return p_value > level
# Example: mnp.is_normal(df) -> this will work on the whole data set; df.aggregate(is_normal) -> this will work by each column

In [13]:
# DEVELOP OUR OWN FUNCTION OF CALCULATING SEMI-DEVIATION
def semi_dev(r):
    excess= r-r.mean()                                        # We demean the returns
    excess_negative = excess[excess<0]                        # We take only the returns below the mean
    excess_negative_square = excess_negative**2               # We square the demeaned returns below the mean
    n_negative = (excess<0).sum()                             # number of returns under the mean
    return (excess_negative_square.sum()/n_negative)**0.5     # semideviation

In [15]:
# MULTIPLE ASSETS FRONTIERS AND CAL WITH LEVERAGE (SUM CAN DIFFER FROM 1)
# USING LAGRANGE MULTIPLIERS (ONLY WORKS FOR UNLIMITED SHORT SELLING). ACTUALLY I FEEL THAT THIS APPROACH IS MORE INTUITIVELY IN TERMS OF CODING

# 1. When we only need the efficient frontier (Markowitz frontier)

# Calculate the Markowitz portfolios from a given targeted returns => define the targeted_returns matrix first
def ef_unlimited_calculation(n_assets, return_matrix, covar_matrix, targeted_returns):
    """
    Calculate the standard deviation (risk) of the portfolio for a range of target returns using Lagrange multipliers.
    Allows for unlimited short selling.
    Returns a DataFrame with standard deviation (risk) and targeted returns, and the minimum-variance portfolio (std_mv, r_mv).
    """
    e = np.ones((n_assets, 1))  # 1-vector as a column vector
    return_matrix = return_matrix.reshape((n_assets, 1))  # Ensure return_matrix is a column vector
    
    # Alpha, delta, and zeta calculations
    alpha = (return_matrix.T @ np.linalg.inv(covar_matrix) @ e).item()  # Scalar
    delta = (e.T @ np.linalg.inv(covar_matrix) @ e).item()  # Scalar
    zeta = (return_matrix.T @ np.linalg.inv(covar_matrix) @ return_matrix).item()  # Scalar
    
    r_mv = alpha / delta  # Expected return of the minimum variance portfolio
    
    # Standard deviation of the minimum variance portfolio
    std_mv = np.sqrt(1 / delta)

    # Lists to store results for the efficient frontier
    std_devs = []
    targeted_returns_list = []

    # Calculate portfolio variance and standard deviation for each target return
    for targeted_return in targeted_returns:
        var_portfolio = (1 / delta) + ((delta / (zeta * delta - alpha**2)) * ((targeted_return - r_mv)**2))
        std_dev = np.sqrt(var_portfolio)
        std_devs.append(std_dev)
        targeted_returns_list.append(targeted_return)
    
    # Create a DataFrame with targeted returns and corresponding risks
    df = pd.DataFrame({
        'Targeted Return': targeted_returns_list,
        'Standard Deviation': std_devs })
    
    return df, std_mv, r_mv  # Return DataFrame and minimum-variance portfolio (std_mv, r_mv)

In [17]:
def ef_unlimited_plot(df, std_mv, r_mv, mode = 'separate'):
    """
    Plot the efficient frontier using the DataFrame containing standard deviations and targeted returns.
    Mark the Minimum-Variance Portfolio on the plot.
    Display the region below the MVP as a dashed grey line if mode='separate'.
    
    Parameters:
    df (DataFrame): DataFrame containing 'Standard Deviation' and 'Targeted Return'.
    std_mv (float): Standard deviation of the minimum-variance portfolio.
    r_mv (float): Targeted return of the minimum-variance portfolio.
    mode (str): 'separate' to split the line into efficient and inefficient regions. 
                Any other value will plot the full frontier as a solid line.
    """
    plt.figure(figsize=(15, 7))

    if mode == 'separate':
        # Separate the data points below and above the minimum-variance portfolio
        below_mvp = df[df['Targeted Return'] < r_mv]
        above_mvp = df[df['Targeted Return'] >= r_mv]

        # Plot the part below the MVP as a dashed grey line
        if not below_mvp.empty:
            plt.plot(below_mvp['Standard Deviation'], below_mvp['Targeted Return'], linestyle='--', color='grey', linewidth=2, label="Inefficient Region")

        # Plot the part above the MVP as a solid blue line
        if not above_mvp.empty:
            plt.plot(above_mvp['Standard Deviation'], above_mvp['Targeted Return'], 'b-', label="Efficient Frontier", linewidth=2)
    else:
        # Plot the entire efficient frontier as a solid line
        plt.plot(df['Standard Deviation'], df['Targeted Return'], 'b-', label="Efficient Frontier", linewidth=2)

    # Plot the Minimum-Variance Portfolio
    plt.scatter(std_mv, r_mv, color='red', label='Minimum-Variance Portfolio', s=100, zorder=5)
    
    plt.xlabel("Standard Deviation", fontsize=14, color = 'navy')
    plt.ylabel("Return", fontsize=14, color = 'navy')
    # plt.yticks(np.arange(0.00 ,0.021, 0.0005)) # OPTIONAL
    plt.title("Efficient Frontier", fontsize=16, color='red')
    plt.grid(True) # OPTIONAL
    plt.legend(loc='best')
    plt.show()

In [19]:
# 2. When we need the Capital Allocation Line (CAL)
# To incorporate the risk-free asset into your current model, you can plot the Capital Allocation Line (CAL), which shows the risk-return tradeoff between a risk-free asset and the optimal risky portfolio (in your case, this will be the portfolio with the highest Sharpe ratio on the efficient frontier). The slope of the CAL is the Sharpe ratio of the optimal risky portfolio, and it starts at the risk-free rate on the y-axis.

def ef_unlimited_cal_plot(df, std_mv, r_mv, risk_free_rate, mode='separate'):
    """
    Plot the efficient frontier along with the Capital Allocation Line (CAL).
    Mark the Minimum-Variance Portfolio and the Tangency Portfolio (Max Sharpe Ratio Portfolio).

    Parameters:
    df (DataFrame): DataFrame containing 'Standard Deviation' and 'Targeted Return'.
    std_mv (float): Standard deviation of the minimum-variance portfolio.
    r_mv (float): Targeted return of the minimum-variance portfolio.
    risk_free_rate (float): The risk-free rate used to calculate the CAL line.
    mode (str): 'separate' to split the line into efficient and inefficient regions.
                'single' to plot the full frontier as a solid line.
    """
    plt.figure(figsize=(15, 7))

    # Calculate the Sharpe ratio for each portfolio on the frontier
    df['Sharpe Ratio'] = (df['Targeted Return'] - risk_free_rate) / df['Standard Deviation']
    
    # Identify the maximum Sharpe ratio portfolio (tangency portfolio)
    max_sharpe_idx = df['Sharpe Ratio'].idxmax()
    max_sharpe_portfolio = df.loc[max_sharpe_idx]
    max_sharpe_std = max_sharpe_portfolio['Standard Deviation']
    max_sharpe_return = max_sharpe_portfolio['Targeted Return']

    # Plot the efficient frontier
    if mode == 'separate':
        # Sort data by targeted return (ascending)
        df_sorted = df.sort_values(by='Targeted Return')

        # Separate the data points below and above the minimum-variance portfolio
        below_mvp = df_sorted[df_sorted['Targeted Return'] < r_mv]
        above_mvp = df_sorted[df_sorted['Targeted Return'] >= r_mv]

        # Plot the part below the MVP as a dashed grey line
        if not below_mvp.empty:
            plt.plot(below_mvp['Standard Deviation'], below_mvp['Targeted Return'], linestyle='--', color='grey', linewidth=2, label="Inefficient Region")

        # Plot the part above the MVP as a solid blue line
        if not above_mvp.empty:
            plt.plot(above_mvp['Standard Deviation'], above_mvp['Targeted Return'], 'b-', label="Efficient Frontier", linewidth=2)
    else:
        # Plot the entire efficient frontier as a solid line
        plt.plot(df['Standard Deviation'], df['Targeted Return'], 'b-', label="Efficient Frontier", linewidth=2)

    # Plot the Minimum-Variance Portfolio
    plt.scatter(std_mv, r_mv, color='red', label='Minimum-Variance Portfolio', s=100, zorder=5)

    # Plot the Tangency Portfolio (Max Sharpe Ratio Portfolio)
    plt.scatter(max_sharpe_std, max_sharpe_return, color='green', label='Max Sharpe Ratio Portfolio', s=100, zorder=5)

    # Calculate and plot the CAL line
    cal_x = np.linspace(0, max_sharpe_std*1.5, 100)  # Extend the CAL line beyond the max Sharpe portfolio
    cal_y = risk_free_rate + (max_sharpe_return - risk_free_rate) / max_sharpe_std * cal_x  # CAL equation

    plt.plot(cal_x, cal_y, linestyle='--', color='orange', label='Capital Allocation Line (CAL)', linewidth=2)
    
    plt.xlabel("Standard Deviation", fontsize=14, color =  'navy')
    plt.ylabel("Targeted Return", fontsize=14, color =  'navy')
    plt.title("Efficient Frontier with CAL", fontsize=16, color='red')
    # plt.yticks(np.arange(0.00 ,0.021, 0.0005)) # OPTIONAL
    plt.grid(True) # OPTIONAL
    plt.legend(loc='best')
    plt.show()

In [21]:
# 3. If we need to access the Max Sharpe Ratio portfolio
# HERE WE DEVELOP A SPECIAL FUNCTION FOR THE MAX SHARPE RATIO PORTFOLIO USING THE LAGRANGE APPROACH WHICH IS MORE ANALYTICALLY ACCURATE. WE CAN USE THE ef_msr_unlimited above to get the weights and vector then calculate other parameters. However, I write a comprehensive one for it for convenience when needed. The reason that I don't plug this into the above process is because this one is of lower efficiency in terms of coding, thus I will use the Scipy optimizer instead. The differences between the 2 methods are significantly minor so it's not a big problem at all.
# NOAH HAS COMPARED THE RESULT BETWEEN THE LAGRANGE APPROACH AND SCIPY OPTIMIZER, THUS JUST KEEP FINE TUNING EVERY THING

def ef_unlimited_msr_components(n_assets, asset_names, return_matrix, covar_matrix, risk_free):

    e = np.ones((n_assets, 1))  # 1-vector as a column vector
    return_matrix = return_matrix.reshape((n_assets, 1))  # Ensuring return_matrix is a column vector
    
    # Alpha, delta, and zeta calculations
    alpha = (return_matrix.T @ np.linalg.inv(covar_matrix) @ e).item()  # Extract scalar
    delta = (e.T @ np.linalg.inv(covar_matrix) @ e).item()  # Extract scalar
    zeta = (return_matrix.T @ np.linalg.inv(covar_matrix) @ return_matrix).item()  # Extract scalar
    r_mv = alpha / delta  # Expected return of the minimum variance portfolio

    # Sharpe ratio related terms
    special_term = zeta - (2 * alpha * risk_free) + (delta * risk_free**2)
    tangency_slope = special_term ** 0.5
    tangency_er = (alpha * risk_free - zeta) / (delta * risk_free - alpha)
    tangency_sd = -tangency_slope / (delta * (risk_free - r_mv))

    # Calculate portfolio weights for the tangency portfolio
    lmbda = (tangency_er - risk_free) / special_term  # Lambda term for weight scaling
    tangency_weights = lmbda * np.linalg.inv(covar_matrix) @ (return_matrix - risk_free * e)  # Tangency portfolio weights
    
    # Calculate total weight of tangency portfolio
    total_tangency_weight = np.sum(tangency_weights)
    
    # Calculate risk-free allocation
    risk_free_allocation = 1 - total_tangency_weight
    
    # Format the output with asset names
    weights_info = "\n".join([f'{name}: {weight:.4f}' for name, weight in zip(asset_names, tangency_weights.flatten())])

    return (f'The weights vector of the tangency portfolio is:\n{weights_info};\n'
            f'The allocation to the risk-free asset is {risk_free_allocation:.4f};\n'
            f'The expected return of the tangency portfolio is {tangency_er:.4f};\n'
            f'The standard deviation of the tangency portfolio is {tangency_sd:.4f};\n'
            f'The slope of the tangency portfolio (max Sharpe ratio) is {tangency_slope:.4f}.')

In [23]:
# REGRESSION FOR ALPHA AND BETA OF ASSETS (CAPM)
import statsmodels.api as sm

def capm_1_asset(stock_returns, market_returns, risk_free_rate):
    """
    Calculate the CAPM alpha and beta for a given stock and market returns.
    
    Parameters:
    - stock_returns (pd.Series): The stock return series -> y
    - market_returns (pd.Series): The market return series -> x variable
    - risk_free_rate (float): The risk-free rate. Default is 0.0.
    
    Returns:
    - dict: A dictionary containing alpha, beta, and the regression model summary.
    """
    # Step 1: Calculate Excess Returns
    stock_excess_returns = stock_returns - risk_free_rate
    market_excess_returns = market_returns - risk_free_rate

    # Step 2: Align the Data
    data = pd.DataFrame({
        'Stock Excess Return': stock_excess_returns,
        'Market Excess Return': market_excess_returns }).dropna()
    
    # Step 3: Add Constant to Market Returns for Intercept (Alpha)
    X = sm.add_constant(data['Market Excess Return'])  # Adds a constant term to the predictor
    
    # Step 4: Run the Regression
    model = sm.OLS(data['Stock Excess Return'], X).fit()
    
    # Step 5: Extract Alpha (intercept) and Beta (slope)
    alpha = model.params['const']
    beta = model.params['Market Excess Return']
    
    # Return the results as a dictionary
    return {
        'alpha': alpha,
        'beta': beta,
        'model_summary': model.summary()}

In [25]:
# Code to run loop for multiple assets
def capm_mul_assets_full(data_frame, market_vector, risk_free):
    results = {}
    # Iterate through each column in the DataFrame 'data'
    for col in data_frame.columns:
    # Calculate CAPM alpha and beta for each column and store it in the results dictionary
        results[col] = capm_1_asset(data_frame[col], market_returns = market_vector, risk_free_rate =  risk_free)
    return results


# Further code to add the loop results as a DataFrame
def capm_mul_assets_coef(results): # the results is the variable assigned to capm_mul_assets_full

    # Initialize lists to store extracted alpha and beta values
    alpha_values = []
    beta_values = []
    
    # Iterate through the results dictionary to extract alpha and beta values
    for col, result in results.items():
        alpha_values.append((col, result['alpha']))  # Add (column name, alpha) tuple to list
        beta_values.append((col, result['beta']))    # Add (column name, beta) tuple to list
        
    # Convert the extracted values into DataFrames
    alpha_df = pd.DataFrame(alpha_values, columns=['Assets', 'ALPHA']).set_index('Assets')
    beta_df = pd.DataFrame(beta_values, columns=['Assets', 'BETA']).set_index('Assets')
    
    # Combine alpha and beta DataFrames into a single DataFrame
    alpha_beta_df = pd.concat([alpha_df, beta_df], axis=1)

    return alpha_beta_df

In [27]:
# CALCULATE BETA OF 1 ASSET
# Use loops if we need to calculate for multiple assets
def calc_beta_1(asset_returns: pd.Series, market_returns: pd.Series) -> float:
    """
    Calculates the beta of an asset given its returns and market returns.
    
    Parameters:
    asset_returns (pd.Series): Daily/weekly/monthly returns of the asset.
    market_returns (pd.Series): Corresponding returns of the market index.
    
    Returns:
    float: Beta of the asset, indicating its systematic risk relative to the market.
    
    Raises:
    ValueError: If the input series are not of the same length or are empty.
    """
    if len(asset_returns) != len(market_returns):
        raise ValueError("The length of asset returns and market returns must be the same.")
    
    if len(asset_returns) == 0 or len(market_returns) == 0:
        raise ValueError("The input series must not be empty.")
    
    # Calculate covariance between asset returns and market returns
    covariance_matrix = np.cov(asset_returns, market_returns)
    
    # Extract the covariance between asset and market (element [0, 1])
    covariance = covariance_matrix[0, 1]
    
    # Calculate variance of the market returns (element [1, 1])
    variance_market = covariance_matrix[1, 1]
    
    # Calculate and return beta
    beta = covariance / variance_market
    return round(beta,2)

In [29]:
# PLOT THE ASSET ON THE SML LINE
def sml_plot_multiple(beta_values: np.ndarray, risk_free_rate: float, market_return: float, assets_data: list):

    # Calculate expected returns using CAPM formula
    expected_returns = risk_free_rate + beta_values * (market_return - risk_free_rate)

    # Plot the SML
    plt.figure(figsize=(10, 6))
    plt.plot(beta_values, expected_returns, label='Security Market Line (SML)', color='blue', lw=2)

    # Add labels and title
    plt.title('Security Market Line (SML) with Multiple Assets')
    plt.xlabel('Beta')
    plt.ylabel('Expected Return')
    plt.grid(True)
    
    # Add an example point for the market portfolio (beta = 1)
    plt.scatter([1], [market_return], color='red', zorder=5, label="Market Portfolio (Beta = 1)")
    
    # Plot each asset with its actual return and beta
    for asset_beta, actual_return, label in assets_data:
        plt.scatter([asset_beta], [actual_return], label=label)
    
    # Display legend
    plt.legend()
    # Show plot
    plt.show()