# Library Imports

In [41]:
import numpy as np
import datetime as dt
import pandas as pd
import yfinance as yf
from scipy.optimize import minimize
import plotly.graph_objects as go
import plotly.express as px

# Needed Functions

In [None]:
def get_data(tickers, start=None, end=None):
    """
    Fetch historical close price data for given tickers using yfinance.

    Parameters:
        tickers (list or str): One or more ticker symbols.
        start (str, optional): Start date in 'YYYY-MM-DD' format.
        end (str, optional): End date in 'YYYY-MM-DD' format.

    Returns:
        DataFrame: Close prices for the specified tickers.
    """
    if start is None and end is None:
        data = yf.download(tickers=tickers)
        return data["Close"]
    data = yf.download(tickers=tickers, start=start, end=end)
    return data["Close"]


def get_return(price_data, mean_cov=True):
    """
    Calculate daily returns from price data and optionally return mean returns and covariance matrix.

    Parameters:
        price_data (DataFrame): Historical price data.
        mean_cov (bool): If True, return [mean_returns, covariance_matrix]; otherwise, return raw returns.

    Returns:
        list or DataFrame: [mean_returns, covariance_matrix] if mean_cov is True, else DataFrame of returns.
    """
    returns = price_data.pct_change().dropna()
    if mean_cov:
        return [returns.mean(), returns.cov()]
    return returns

def get_correlation(price_data, color_scale='RdBu_r', title="Asset Correlation Matrix", return_fig=True):
    """
    Calculate and plot the correlation matrix as a heatmap for a given price data DataFrame.
    
    Parameters:
        price_data (DataFrame): Historical price data.
        color_scale (str): The color scale to use for the heatmap.
        title (str): The title for the heatmap.
        return_fig (bool): If True, returns the figure instead of showing it immediately.
    
    Returns:
        Figure: (Optional) The Plotly figure if return_fig is True.
    """
    if price_data is None or price_data.empty:
        print("No valid price data provided.")
        return None

    returns = price_data.pct_change().dropna()
    corr_matrix = returns.corr()
    fig = px.imshow(
        corr_matrix,
        text_auto=True,
        color_continuous_scale=color_scale,
        title=title
    )
    fig.update_layout(
        xaxis_title="Features",
        yaxis_title="Features"
    )
    
    if return_fig:
        return fig
    else:
        fig.show()


def portfolio_performance(weights, mean_returns, cov_matrix, trading_days=365):
    """
    Calculates the annualized portfolio return and standard deviation.

    Parameters:
        weights (array-like): Portfolio weights.
        mean_returns (Series): Mean of daily returns.
        cov_matrix (DataFrame): Covariance matrix of daily returns.
        trading_days (int): Number of trading days per year.

    Returns:
        list: [annualized return, annualized standard deviation] in decimal form.
    """
    annual_return = np.sum(mean_returns * weights) * trading_days
    annual_std = np.sqrt(weights.T @ (cov_matrix @ weights)) * np.sqrt(trading_days)
    return [annual_return, annual_std]


def neg_sharpe_ratio(weights, mean_returns, cov_matrix, risk_free_rate=0.0, trading_days=365):
    """
    Computes the negative Sharpe ratio for a given set of portfolio weights.

    Parameters:
        weights (array-like): Portfolio weights.
        mean_returns (Series): Mean of daily returns.
        cov_matrix (DataFrame): Covariance matrix of daily returns.
        risk_free_rate (float): Annualized risk-free rate.
        trading_days (int): Number of trading days per year.

    Returns:
        float: Negative Sharpe ratio.
    """
    annual_return, annual_std = portfolio_performance(weights, mean_returns, cov_matrix, trading_days)
    # Use np.isclose to avoid division by zero issues with floating-point precision
    if np.isclose(annual_std, 0):
        return np.inf
    sharpe_ratio = (annual_return - risk_free_rate) / annual_std
    return -sharpe_ratio


def optimize_sharpe(mean_returns, cov_matrix, risk_free_rate=0.0, trading_days=365, constraint_set=(0, 1)):
    """
    Optimize the portfolio for maximum Sharpe ratio by minimizing the negative Sharpe ratio.

    Parameters:
        mean_returns (Series): Mean daily returns.
        cov_matrix (DataFrame): Covariance matrix of daily returns.
        risk_free_rate (float): Annualized risk-free rate.
        trading_days (int): Number of trading days per year.
        constraint_set (tuple): Bounds for the weights (default (0, 1)).

    Returns:
        OptimizeResult: The optimization result from scipy.optimize.minimize.
    """
    n_assets = len(mean_returns)
    args = (mean_returns, cov_matrix, risk_free_rate, trading_days)
    constraints = [{"type": "eq", "fun": lambda x: np.sum(x) - 1}]
    weights_bounds = tuple(constraint_set for _ in range(n_assets))
    init_guess = n_assets * [1. / n_assets]

    result = minimize(
        neg_sharpe_ratio,
        init_guess,
        args=args,
        method="SLSQP",
        bounds=weights_bounds,
        constraints=constraints
    )
    
    if not result.success:
        print("Optimization for maximum Sharpe ratio failed:", result.message)
    return result


def get_portfolio_variance(weights, mean_returns, cov_matrix, trading_days=365):
    """
    Calculate the annualized portfolio standard deviation (volatility).

    Parameters:
        weights (array-like): Portfolio weights.
        mean_returns (Series): Mean of daily returns.
        cov_matrix (DataFrame): Covariance matrix of daily returns.
        trading_days (int): Number of trading days per year.

    Returns:
        float: Annualized portfolio volatility.
    """
    _, annual_std = portfolio_performance(weights, mean_returns, cov_matrix, trading_days)
    return annual_std


def optimize_variance(mean_returns, cov_matrix, trading_days=365, constraint_set=(0, 1)):
    """
    Optimize the portfolio for minimum variance (risk).

    Parameters:
        mean_returns (Series): Mean of daily returns.
        cov_matrix (DataFrame): Covariance matrix of daily returns.
        trading_days (int): Number of trading days per year.
        constraint_set (tuple): Bounds for weights (default (0,1)).

    Returns:
        OptimizeResult: The optimization result from scipy.optimize.minimize.
    """
    n_assets = len(mean_returns)
    args = (mean_returns, cov_matrix, trading_days)
    constraints = [{"type": "eq", "fun": lambda x: np.sum(x) - 1}]
    weights_bounds = tuple(constraint_set for _ in range(n_assets))
    init_guess = n_assets * [1. / n_assets]

    result = minimize(
        get_portfolio_variance,
        init_guess,
        args=args,
        method="SLSQP",
        bounds=weights_bounds,
        constraints=constraints
    )
    
    if not result.success:
        print("Optimization for minimum variance failed:", result.message)
    return result


def get_portfolio_return(weights, mean_returns, cov_matrix, trading_days=365):
    """
    Calculate the annualized portfolio return.

    Parameters:
        weights (array-like): Portfolio weights.
        mean_returns (Series): Mean daily returns.
        cov_matrix (DataFrame): Covariance matrix of daily returns.
        trading_days (int): Number of trading days per year.

    Returns:
        float: Annualized portfolio return in decimal form.
    """
    annual_return, _ = portfolio_performance(weights, mean_returns, cov_matrix, trading_days)
    return annual_return


def efficient_optimization(mean_returns, cov_matrix, return_target, constraint_set=(0, 1), trading_days=365):
    """
    Optimize the portfolio to achieve at least a target annual return while minimizing variance.

    Parameters:
        mean_returns (Series): Mean daily returns.
        cov_matrix (DataFrame): Covariance matrix of daily returns.
        return_target (float): Desired annual return target in decimal form.
        constraint_set (tuple): Bounds for each asset weight (default (0,1)).
        trading_days (int): Number of trading days per year.

    Returns:
        OptimizeResult: The optimization result from scipy.optimize.minimize.
    """
    n_assets = len(mean_returns)
    args = (mean_returns, cov_matrix, trading_days)
    constraints = [
        {
            "type": "ineq",
            "fun": lambda x: get_portfolio_return(x, mean_returns, cov_matrix, trading_days) - return_target
        },
        {
            "type": "eq",
            "fun": lambda x: np.sum(x) - 1
        }
    ]
    weights_bounds = tuple(constraint_set for _ in range(n_assets))
    init_guess = n_assets * [1. / n_assets]

    result = minimize(
        get_portfolio_variance,
        init_guess,
        args=args,
        method="SLSQP",
        bounds=weights_bounds,
        constraints=constraints
    )
    
    if not result.success:
        print("Efficient optimization failed for target return", return_target, ":", result.message)
    return result


def calculated_result(mean_returns, cov_matrix, risk_free_rate=0.0, constraints_set=(0, 1), trading_days=365):
    """
    Compute portfolios optimized for:
      - Maximum Sharpe ratio.
      - Minimum variance.
      - Efficient frontier (minimizing variance for a series of target returns).

    Returns:
        Tuple containing:
          - maxSR_return_disp, maxSR_std_disp: Performance metrics (in percentage) for the max Sharpe ratio portfolio.
          - maxSR_allocation: DataFrame of asset weightings for the max Sharpe portfolio.
          - minVar_return_disp, minVar_std_disp: Performance metrics (in percentage) for the minimum variance portfolio.
          - minVar_allocation: DataFrame of asset weightings for the min variance portfolio.
          - effopt_list: List of portfolio variances (risks) for different target returns along the efficient frontier.
    """
    # Optimize maximum Sharpe ratio portfolio
    maxSR_portfolio = optimize_sharpe(mean_returns, cov_matrix, risk_free_rate, trading_days, constraints_set)
    maxSR_weights = maxSR_portfolio["x"]
    maxSR_return_dec, maxSR_std_dec = portfolio_performance(maxSR_weights, mean_returns, cov_matrix, trading_days)
    # Keep original decimal values for internal calculations
    # Convert to percentage for display
    maxSR_return_disp, maxSR_std_disp = round(maxSR_return_dec * 100, 2), round(maxSR_std_dec * 100, 2)
    maxSR_allocation = pd.DataFrame(maxSR_weights, index=mean_returns.index, columns=["Weightings"])
    maxSR_allocation["Weightings"] = [round(x * 100, 1) for x in maxSR_allocation["Weightings"]]

    # Optimize minimum variance portfolio
    minVar_portfolio = optimize_variance(mean_returns, cov_matrix, trading_days, constraints_set)
    minVar_weights = minVar_portfolio["x"]
    minVar_return_dec, minVar_std_dec = portfolio_performance(minVar_weights, mean_returns, cov_matrix, trading_days)
    # Convert to percentage for display
    minVar_return_disp, minVar_std_disp = round(minVar_return_dec * 100, 2), round(minVar_std_dec * 100, 2)
    minVar_allocation = pd.DataFrame(minVar_weights, index=mean_returns.index, columns=["Weightings"])
    minVar_allocation["Weightings"] = [round(x * 100, 1) for x in minVar_allocation["Weightings"]]

    # Generate efficient frontier data using target returns in decimal form
    effopt_list = []
    target_returns = np.linspace(minVar_return_dec, maxSR_return_dec, num=20)
    for target in target_returns:
        opt_result = efficient_optimization(mean_returns, cov_matrix, return_target=target, constraint_set=constraints_set, trading_days=trading_days)
        if not opt_result.success:
            effopt_list.append(np.nan)
        else:
            effopt_list.append(opt_result["fun"])
    result = {
        "maxSR": {
            "return_disp": maxSR_return_disp,
            "return_ori": maxSR_return_dec,
            "std_disp": maxSR_std_disp,
            "std_ori": maxSR_std_dec,
            "allocation": maxSR_allocation
        },
        "minVar": {
            "return_disp": minVar_return_disp,
            "return_ori": minVar_return_dec,
            "std_disp": minVar_std_disp,
            "std_ori": minVar_std_dec,
            "allocation": minVar_allocation
        },
        "efficient_frontier": effopt_list,
        "target_returns": target_returns
    }

    return result

def plot_efficient_frontier(mean_returns, cov_matrix, risk_free_rate=0.0, constraints_set=(0, 1), trading_days=365):
    """
    Create an interactive Plotly visualization of the efficient frontier,
    maximum Sharpe ratio, and minimum variance portfolios.
    """
    results = calculated_result(mean_returns, cov_matrix, risk_free_rate, constraints_set, trading_days)
    
    # Maximum Sharpe Ratio point
    maxSR_plot = go.Scatter(
        name="Max Sharpe Ratio",
        mode="markers",
        x=[round(results["maxSR"]["std_ori"] * 100, 2)],
        y=[round(results["maxSR"]["return_ori"] * 100, 2)],
        marker=dict(color="#8883f0", size=18)
    )
    
    # Minimum Variance point
    minVar_plot = go.Scatter(
        name="Min Variance",
        mode="markers",
        x=[round(results["minVar"]["std_ori"] * 100, 2)],
        y=[round(results["minVar"]["return_ori"] * 100, 2)],
        marker=dict(color="#f39530", size=18)
    )
    
    # Efficient frontier curve
    # Convert volatilities and target returns to percentages
    eff_vol = [round(v * 100, 2) if not np.isnan(v) else np.nan for v in results["efficient_frontier"]]
    eff_ret = [round(t * 100, 2) for t in results["target_returns"]]
    eff_curve = go.Scatter(
        name="Efficient Frontier",
        mode="lines",
        x=eff_vol,
        y=eff_ret,
        line=dict(color="#75badb", width=3, dash="dot")
    )
    
    data = [maxSR_plot, minVar_plot, eff_curve]
    layout = go.Layout(
        title="Portfolio Optimization with Efficient Frontier",
        xaxis=dict(title="Annualized Volatility (%)"),
        yaxis=dict(title="Annualized Return (%)"),
        showlegend=True,
        legend=dict(
            x=0.8,
            y=0.1,
            traceorder="normal",
            bgcolor="#e6e6e6",
            bordercolor="white",
            borderwidth=1,
            font=dict(color="#222222")
        ),
        width=750,
        height=600
    )

    fig = go.Figure(data=data, layout=layout)
    return fig.show()

def get_asset_metrics(price_data, risk_free_rate=0.0, trading_days=365):
    """
    Calculate annual return, annual volatility, and Sharpe ratio for each asset,
    and return a DataFrame with these metrics.

    Parameters:
        price_data (DataFrame): Historical price data with assets as columns.
        risk_free_rate (float): Annualized risk-free rate (in decimal, e.g., 0.02 for 2%).
        trading_days (int): Number of trading days per year (365 for default value).

    Returns:
        DataFrame: Contains each asset's annual return (%), annual volatility (%), and Sharpe ratio.
    """

    # Calculate daily returns, daily volatility and drop NaN values
    returns = price_data.pct_change().dropna()
    mean_returns = returns.mean()
    daily_std = returns.std()

    # Calculate annual return and annual volatility
    annual_return = mean_returns * trading_days
    annual_std = daily_std * np.sqrt(trading_days)

    # Calculate sharpe ratio
    # (annual return - risk_free_rate) divided by annual volatility
    sharpe_ratio = (annual_return - risk_free_rate) / annual_std

    # Create a DataFrame with the metrics
    metrics_df = pd.DataFrame({
        "Annualized Return": round(annual_return*100, 2),
        "Annualized Volatility": round(annual_std*100, 2),
        "Sharpe Ratio": round(sharpe_ratio, 3)
    })
    
    return metrics_df

In [43]:
df = get_data(["DOGE-USD", "BTC-USD", "XRP-USD", "LTC-USD", "GOOG"])

[*********************100%***********************]  5 of 5 completed


In [44]:
mean_returns, cov_mtx_return = get_return(df)


The default fill_method='pad' in DataFrame.pct_change is deprecated and will be removed in a future version. Either fill in any non-leading NA values prior to calling pct_change or specify 'fill_method=None' to not fill NA values.



In [45]:
mean_returns

Ticker
BTC-USD     0.001623
DOGE-USD    0.004757
GOOG        0.000580
LTC-USD     0.001533
XRP-USD     0.002710
dtype: float64

In [46]:
maxSR = optimize_sharpe(
    mean_returns=mean_returns,
    cov_matrix=cov_mtx_return,
    risk_free_rate=0.0,
    constraint_set=(0, 1)
    )

maxSR

     fun: -1.2560941949128221
     jac: array([-2.76267529e-05,  5.24222851e-05, -4.90248203e-06,  4.48741391e-01,
        2.68220901e-07])
 message: 'Optimization terminated successfully'
    nfev: 69
     nit: 11
    njev: 11
  status: 0
 success: True
       x: array([1.36936838e-01, 1.27471321e-01, 6.00406007e-01, 3.80554963e-17,
       1.35185834e-01])

In [47]:
minVar = optimize_variance(
    mean_returns=mean_returns,
    cov_matrix=cov_mtx_return,
    constraint_set=(0, 1)
)
minVar

     fun: 0.29633701747972807
     jac: array([0.29733031, 0.34194367, 0.29622489, 0.33286845, 0.29701488])
 message: 'Optimization terminated successfully'
    nfev: 31
     nit: 5
    njev: 5
  status: 0
 success: True
       x: array([1.00697303e-01, 1.41358694e-17, 8.98273166e-01, 0.00000000e+00,
       1.02953088e-03])

In [48]:
np.round(minVar["x"], 5)

array([0.1007 , 0.     , 0.89827, 0.     , 0.00103])

In [49]:
calculated_result(mean_returns, cov_mtx_return)

{'maxSR': {'return_disp': 56.33,
  'return_ori': 0.563329655021945,
  'std_disp': 44.85,
  'std_ori': 0.4484772378563872,
  'allocation':           Weightings
  Ticker              
  BTC-USD         13.7
  DOGE-USD        12.7
  GOOG            60.0
  LTC-USD          0.0
  XRP-USD         13.5},
 'minVar': {'return_disp': 25.09,
  'return_ori': 0.25091524218033023,
  'std_disp': 29.63,
  'std_ori': 0.29633701747972807,
  'allocation':           Weightings
  Ticker              
  BTC-USD         10.1
  DOGE-USD         0.0
  GOOG            89.8
  LTC-USD          0.0
  XRP-USD          0.1},
 'efficient_frontier': [0.2963368337851502,
  0.2971149304266686,
  0.2989632002461843,
  0.30180945057860764,
  0.305624786716021,
  0.31037447047166816,
  0.31601555222242683,
  0.3225021808946128,
  0.3297836885022617,
  0.3378092792081962,
  0.3465265303944413,
  0.3558852761087667,
  0.3658360621265688,
  0.37633192383941116,
  0.38732855284750795,
  0.39878453006716186,
  0.410661419565715

In [50]:
a = efficient_optimization(mean_returns, cov_mtx_return, return_target=1)
a["x"].round(4)*100

array([18.15, 30.89, 19.03,  0.  , 31.93])

In [51]:
marker=dict(color="red", size=14, line=dict(width=3, color="black"))
marker

{'color': 'red', 'size': 14, 'line': {'width': 3, 'color': 'black'}}

In [52]:
plot_efficient_frontier(mean_returns=mean_returns, cov_matrix=cov_mtx_return)

In [53]:
get_correlation(df)


The default fill_method='pad' in DataFrame.pct_change is deprecated and will be removed in a future version. Either fill in any non-leading NA values prior to calling pct_change or specify 'fill_method=None' to not fill NA values.



In [73]:
get_asset_metrics(df)


The default fill_method='pad' in DataFrame.pct_change is deprecated and will be removed in a future version. Either fill in any non-leading NA values prior to calling pct_change or specify 'fill_method=None' to not fill NA values.



Unnamed: 0_level_0,Annualized Return,Annualized Volatility,Sharpe Ratio
Ticker,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
BTC-USD,59.24,69.21,0.856
DOGE-USD,173.63,184.53,0.941
GOOG,21.18,30.45,0.695
LTC-USD,55.97,97.73,0.573
XRP-USD,98.92,117.92,0.839
