# Mean Variance Portfolio Optimization

In [1]:
import pandas as pd
import numpy as np
from sklearn.covariance import *


## Import data

In [2]:
df_raw_all = pd.read_hdf("dow30_full_selected.h5")


In [3]:
df_raw_prices = df_raw_all[["date", "tic", "close"]].rename(
    columns={"close": "price"})


In [4]:
df_prices = df_raw_prices.pivot(
    index="date", columns="tic", values="price").loc["2009-01-02":"2020-01-01"]


## Generate returns from prices

In [5]:
# Define functions
class Returns:
    def __init__(self, price_data: pd.DataFrame):
        """
        Initializes the Returns class instance with asset price data

        Parameters
        ----------
        price_data : pd.DataFrame
            Price data of assets
        """
        self.price_matrix = price_data.values.T
        self.assets = price_data.columns
        self.index = price_data.index

    @staticmethod
    def return_formula(
        price_matrix: np.ndarray,
        index: pd.Index,
        roll: bool = True,
        window: int = 1,
        log: bool = False,
    ) -> tuple:
        """"
        Convert price data to return data

        Parameters
        ----------
        price_matrix : np.ndarray
            Matrix of prices
        index : pd.Index
            Dates time scale
        roll : bool, optional
            Rolling return, by default True
        int : int, optional
            Time interval, by default 1
        log : bool, optional
            Continuous or discrete, by default False

        Returns
        -------
        tuple
            Returns a tuple of (array of returns, dates)
        """
        step = 1 if roll else window
        shift = window

        return (
            (
                np.log(
                    ((price_matrix / np.roll(price_matrix, shift=shift, axis=1)) - 1)
                )[:, shift::step],
                index[shift::step],
            )
            if log
            else (
                ((price_matrix / np.roll(price_matrix, shift=shift, axis=1)) - 1)[
                    :, shift::step
                ],
                index[shift::step],
            )
        )

    def compute_returns(self, method: str, **kwargs) -> pd.DataFrame:
        """
        Calculates asset returns based on defined method and parameters

        Parameters
        ----------
        method: str
            Options = ["daily", "rolling", "collapse"]
            daily: calculates daily percentage change
            rolling: calculates rolling percentage change based on window, user passes in a parameter window=?
            collapse: calculates percentage change based on window, user passes in a parameter window=?
                e.g.: if window=22, output is the return between each 22 day interval from the beginning
            Aditional option: calculates continuous return by passing in log=True, or discrete otherwise
        **kwargs: arguments passed into return_formula()

        Returns
        -------
        pd.DataFrame
            Returns a pandas DataFrame of asset returns
        """
        price_matrix = self.price_matrix
        index = self.index

        if method == "daily":
            return_matrix, return_idx = Returns.return_formula(
                price_matrix, index, window=1, roll=True, **kwargs
            )
        elif method == "rolling":
            return_matrix, return_idx = Returns.return_formula(
                price_matrix, index, roll=True, **kwargs
            )
        elif method == "collapse":
            return_matrix, return_idx = Returns.return_formula(
                price_matrix, index, roll=False, **kwargs
            )
        else:
            print(
                "What is going on? Invalid method! Valid Inputs: daily, rolling, collapse"
            )

        return pd.DataFrame(return_matrix.T, columns=self.assets, index=return_idx)

    def compute_mean_return(
        self, method: str, time_scaling: int = 252, **kwargs
    ) -> pd.Series:
        """
        Calculates mean historical asset returns to be used in mean-variance optimizer

        Parameters
        ----------
        method: str
            Options = ["arithmetic", "geometric"]
            arithmetic: Calculates the arithmetic mean of return, all paramters in compute_returns() can be passed in as additional arguments
            geometric: Calculates the geometric mean from first to last observation

        time_scaling: int, optional
            Annualizes daily mean return, by default 252

        **kwargs: additional arguments if using arithmetic

        Returns
        -------
        pd.Series
            Returns a pandas Series of asset mean returns
        """
        price_matrix = self.price_matrix
        index = self.index

        return_matrix, return_idx = Returns.return_formula(
            price_matrix, index, **kwargs
        )

        if method == "arithmetic":
            mean_return = np.mean(return_matrix, axis=1) * time_scaling
        elif method == "geometric":
            mean_return = (
                (price_matrix[:, -1] / price_matrix[:, 0])
                ** (time_scaling / price_matrix.shape[1])
            ) - 1
        else:
            print(
                "What is going on? Invalid method! Valid Inputs: arithmetic, geometric"
            )

        return pd.Series(mean_return, index=self.assets)


In [6]:
# Call functions
returns_generator = Returns(df_prices)
df_returns = returns_generator.compute_returns(method="daily")
mu_return_geom = returns_generator.compute_mean_return(method="geometric")
mu_return_arit = returns_generator.compute_mean_return(method="arithmetic")


## Compute covariance matrix

In [7]:
# Define functions
class Risks:
    def __init__(self, returns_data: pd.DataFrame):
        """
        Initializes the Risks class instance with asset returns data

        Parameters
        ----------
        returns_data : pd.DataFrame
            Price data of assets
        """
        self.return_matrix = returns_data.values.T
        self.assets = returns_data.columns

    def semi_cov(
        self,
        return_matrix: np.ndarray,
        bm_return: float = 0.0001,
        assume_zero: bool = False,
    ) -> np.ndarray:
        """
        Computes the semi-covariance matrix

        Parameters
        ----------
        return_matrix : np.ndarray
            Array of returns
        bm_return : float, optional
            Ignores all individual asset returns above the bm_return when calculating covariance, by default 0.0001
        assume_zero : bool, optional
            Long term daily mean return for an individual asset is sometimes assumed to be 0, by default False

        Returns
        -------
        np.ndarray
            Matrix of returns to use in semi-covariance estimation
        """
        return_matrix_copy = return_matrix.copy()

        def adjust_return_vector(return_vector: np.ndarray, bm_return: float):
            return_vector[return_vector >= bm_return] = np.mean(
                return_vector[return_vector < bm_return]
            )

            return return_vector

        return_matrix_copy = (
            np.fmin(return_matrix_copy - bm_return, 0)
            if assume_zero
            else np.apply_along_axis(
                adjust_return_vector, axis=1, arr=return_matrix_copy, bm_return=bm_return
            )
        )

        return return_matrix_copy

    def construct_weights(self, return_matrix: np.ndarray) -> np.ndarray:
        """
        Customized weight array

        Parameters
        ----------
        return_matrix : np.ndarray
            Array of returns

        Returns
        -------
        np.ndarray
            Returns array of weights for each asset (can be not all equal, in progress)
        """

        return np.repeat(
            np.divide(
                1, return_matrix).shape[1], repeats=return_matrix.shape[1]
        )

    @staticmethod
    def find_cov(
        return_matrix: np.ndarray, weight_factor: np.ndarray, builtin: bool
    ) -> np.ndarray:
        """
        Estimate a covariance matrix, given data and weights

        Parameters
        ----------
        return_matrix : np.ndarray
            Array of returns
        weight_factor : np.ndarray
            Array of observation vector weights. These relative weights are typically large for observations considered “important” and smaller for observations considered less “important”
        builtin : bool
            If True then calls np.cov() to calculate, otherwise use matrix calculation method written in the class

        Returns
        -------
        np.ndarray
            The covariance matrix of the asset returns
        """
        return np.cov(return_matrix, aweights=weight_factor)

    def sample_cov(
        self,
        return_matrix: np.ndarray,
        unit_time: int = 1,
        weights: np.ndarray = None,
        builtin: bool = False,
        **kwargs
    )-> np.ndarray:
        """
        Sample covariance computation

        Parameters
        ----------
        return_matrix : np.ndarray
            Array of 
        unit_time : int
            Frequency of returns, by default 1 (daily)
        weights : np.ndarray, optional
            Array of observation weights, by default None
        builtin : bool, optional
            If True then calls np.cov() to calculate, otherwise use matrix calculation method written in the class, by default False

        Returns
        -------
        np.ndarray
            Array of sample covariance values
        """
        weights = self.construct_weights(return_matrix)

        return Risks.find_cov(return_matrix, weights, builtin) * unit_time

    def scikit_cov_technique(
        self, return_matrix: np.ndarray, technique: str, time_scaling: int = 252, **kwargs
    ) -> np.ndarray:
        """
        Using sklearn.covariance methods to construct covariance matrix

        Parameters
        ----------
        return_matrix : np.ndarray
            Array of returns
        technique : str
            Options to select sklearn.covariance methods
        time_scaling : int, optional
            Annualize covariance matrix (assuming daily input), by default 252

        Returns
        -------
        np.ndarray
            Returns covariance matrix in it's raw form
        """
        technique_dict = {
            "EmpiricalCovariance": EmpiricalCovariance,
            "EllipticEnvelope": EllipticEnvelope,
            "GraphicalLasso": GraphicalLasso,
            "GraphicalLassoCV": GraphicalLassoCV,
            "LedoitWolf": LedoitWolf,
            "MinCovDet": MinCovDet,
            "OAS": OAS,
            "ShrunkCovariance": ShrunkCovariance,
        }

        try:
            return (
                technique_dict[technique](
                    **kwargs).fit(return_matrix.T).covariance_
                * time_scaling
            )
        except KeyError:
            print(
                "What is going on? Invalid technique! Valid inputs: EmpiricalCovariance, EllipticEnvelope, GraphicalLasso, GraphicalLassoCV, LedoitWolf, MinCovDet, OAS, ShrunkCovariance"
            )

    def compute_cov_matrix(
        self,
        technique: str = "sample",
        semi: bool = False,
        time_scaling: int = 252,
        builtin: bool = False,
        weights: np.ndarray = None,
        bm_return: float = 0.00025,
        assume_zero: bool = False,
        normalize: bool = False,
        **kwargs
    ) -> pd.DataFrame:
        """
        Calculates covariance matrix given the return data input

        Parameters
        ----------
        technique : str, optional
            additional_options: ["EmpiricalCovariance", "EllipticEnvelope", "GraphicalLasso", "GraphicalLassoCV",
                                    "LedoitWolf", "MinCovDet", "OAS", "ShrunkCovariance"]
            Specifies the calculation technique for the covariance matrix, by default "sample"
        semi : bool, optional
            If True, returns a semivariance matrix that emphasizes on downside portfolio variance, by default False
        time_scaling : int, optional
            Default annualizes the covariance matrix (assuming daily return is the input), by default 252
        builtin : bool, optional
            If True then calls np.cov() to calculate, otherwise use matrix calculation method written in the class, by default False
        weights : np.ndarray, optional
            Array of observation vector weights, by default None
        bm_return : float, optional
            additional parameter for calculating semivariance matrix, by default 0.00025
        assume_zero : bool, optional
            Long term daily mean return for an individual asset is sometimes assumed to be 0, by default False
        normalize : bool, optional
            To normalize the covariance matrix. In the specific case for covariance matrix, a normalized covariance
            matrix is a correlation matrix, by default False

        Returns
        -------
        pd.DataFrame
            Returns the covariance matrix as a pandas DataFrame
        """
        return_matrix = self.return_matrix

        if semi:
            return_matrix = self.semi_cov(
                return_matrix, bm_return=bm_return, assume_zero=assume_zero
            )

        if technique == "sample":
            cov_matrix = self.sample_cov(
                return_matrix, time_scaling, builtin=builtin, weights=weights, **kwargs
            )
        else:
            cov_matrix = self.scikit_cov_technique(
                return_matrix, technique, time_scaling, **kwargs
            )

        if normalize:
            cov_mat = cov_mat * np.dot(
                ((np.diag(cov_mat)) ** -0.5).reshape(-1, 1),
                ((np.diag(cov_mat)) ** -0.5).reshape(1, -1),
            )

        return pd.DataFrame(cov_matrix, index=self.assets, columns=self.assets)


In [8]:
cov_generator = Risks(df_returns)
cov_matrix = cov_generator.compute_cov_matrix()


  np.divide(


## Declare metrics, constraints and objectives

In [19]:
# TODO: add utility functions
class Metrics:
    def __init__(self, return_vector: np.ndarray, moment_matrix: np.ndarray, assets: list, moment: int = 2):
        """
        Initializes a Metrics instance with parameters to compute portfolio metrics

        Parameters
        ----------
        return_vector : np.ndarray
            Vector of mean returns
        moment_matrix : np.ndarray
            Covariance matrix
        assets : list[str]
            List of asset names
        moment : int, optional
            The order of moment matrix, by default 2
        """
        self.return_vector = return_vector
        self.moment_matrix = moment_matrix
        self.moment = moment
        self.assets = assets

        self.method_dict = {"leverage": self.leverage,
                            "num_assets": self.num_assets,
                            "concentration": self.concentration,
                            "correlation": self.correlation,
                            "diversification": self.diversification,
                            "volatility": self.volatility,
                            "risk_parity": self.risk_parity,
                            "expected_return": self.expected_return,
                            "sharpe": self.sharpe}

    # Weight related portfolio metrics

    def leverage(self, w: np.ndarray) -> float:
        """
        Computes the leverage of the portfolio based on weights

        Parameters
        ----------
        w : np.ndarray
            Array of portfolio weights

        Returns
        -------
        float
            Total leverage of portfolio
        """
        return np.sum(np.sqrt(np.square(w)))

    def num_assets(self, w: np.ndarray) -> int:
        """
        Computes the number of assets in the portfolio based on weights

        Parameters
        ----------
        w : np.ndarray
            Array of portfolio weights

        Returns
        -------
        int
            Number of assets
        """
        return len(w[np.round(w, 4) != 0])

    def concentration(self, w: np.ndarray, top_holdings:int) -> float:
        """
        Computes the % concentration of the portfolio based on weights

        Parameters
        ----------
        w : np.ndarray
            Array of portfolio weights
        top_holdings : int
            _description_

        Returns
        -------
        float
            The percentage of the portfolio formed by the top_holdings number of assets
        """
        return -np.sum(np.partition(-np.sqrt(np.square(w)), top_holdings)[:top_holdings])/np.sum(np.sqrt(np.square(w)))
    
    # Risk portfolio metrics

    def correlation(self, w: np.ndarray) -> float:
        """
        Computes the portfolio correlation coefficient based on weights

        Parameters
        ----------
        w : np.ndarray
            Array of portfolio weights

        Returns
        -------
        float
            Correlation coefficient
        """
        corr_matrix = self.moment_matrix * np.dot(((np.diag(self.moment_matrix)) ** -0.5).reshape(-1, 1),
                                            ((np.diag(self.moment_matrix)) ** -0.5).reshape(1, -1))

        return w @ corr_matrix @ w.T

    def diversification(self, w: np.ndarray) -> float:
        """
        Computes the portfolio diversification based on weights

        Parameters
        ----------
        w : np.ndarray
            Array of portfolio weights

        Returns
        -------
        float
            Ratio of the weighted average of volatilities divided by the portfolio volatility
        """
        std_arr = np.diag(self.moment_matrix) ** 0.5

        return (w @ std_arr)/ np.sqrt(w @ self.moment_matrix @ w.T)
    
    def volatility(self, w: np.ndarray) -> float:
        """
        Computes the portfolio volatility based on weights

        Parameters
        ----------
        w : np.ndarray
            Array of portfolio weights

        Returns
        -------
        float
            Standard deviation of portfolio returns
        """
        return np.sqrt(w @ self.moment_matrix @ w.T)
    
    def variance(self, w: np.ndarray) -> float:
        """
        Computes the portfolio variance based on weights

        Parameters
        ----------
        w : np.ndarray
            Array of portfolio weights

        Returns
        -------
        float
            Variance of portfolio returns
        """
        return w @ self.moment_matrix @ w.T

    def risk_parity(self, w: np.ndarray) -> float:
        """
        Computes the portfolio risk-parity based on weights

        Parameters
        ----------
        w : np.ndarray
            Array of portfolio weights

        Returns
        -------
        float
            Risk-parity of portfolio
        """
        return 0.5 * w @ self.moment_matrix @ w.T - np.sum(np.log(w))/len(self.assets)

    # Risk-reward portfolio metrics

    def expected_return(self, w: np.ndarray) -> float:
        """
        Compute the portfolio expected return based on weights

        Parameters
        ----------
        w : np.ndarray
            

        Returns
        -------
        float
            _description_
        """
        return w @ self.return_vector
        
    def sharpe(self, w: np.ndarray, rfr: float=0.04)-> float:
        """
        Calculates the Sharpe ratio of the portfolio
        *If the covariance matrix is semivariance matrix, then it calculates the Sortino ratio

        Parameters
        ----------
        w : np.ndarray
            Array of portfolio weights
        rfr : float, optional
            Constant risk free rate of return, by default 0.04

        Returns
        -------
        float
            Sharpe ratio of portfolio
        """
        assert rfr > 0, "Risk free rate must be greater than 0."

        return (self.expected_return(w) - rfr) / self.volatility(w)
    
    # Constant portfolio weights

    def inverse_volatility(self, leverage: float) -> np.ndarray:
        """
        Weights of the portfolio are based on the inverse volatility of the individual assets

        Parameters
        ----------
        leverage : float
            Leverage coefficient

        Returns
        -------
        np.ndarray
            Array of portfolio weights
        """
        std_arr = np.diag(self.moment_matrix) ** 0.5
        
        return (1/std_arr) / np.sum(1/std_arr) * leverage
    
    def inverse_variance(self, leverage: float) -> np.ndarray:
        """
        Weights of the portfolio are based on the inverse variance of the individual assets

        Parameters
        ----------
        leverage : float
            Leverage coefficient

        Returns
        -------
        np.ndarray
            Array of portfolio weights
        """
        var_arr = np.diag(self.moment_matrix)

        return (1/var_arr) / np.sum(1/var_arr) * leverage
    
    def equal_weight(self, leverage: float) -> np.ndarray:
        """
        Equal weight portfolio

        Parameters
        ----------
        leverage : float
            Leverage coefficient

        Returns
        -------
        np.ndarray
            Array of portfolio weights
        """
        return np.repeat(leverage/len(self.assets), len(self.assets))

## Define objective function

In [None]:
class Objective(Metrics):
    def __init__(self, return_vector: np.ndarray, moment_matrix: np.ndarray, assets: list, moment: int = 2):
        # Same parameters as Metrics()
        super().__init__(return_vector, moment_matrix, assets, moment)

        self.method_dict = {"quadratic_utility": self.efficient_frontier,
                            "equal_risk_parity": self.equal_risk_parity,
                            "min_correlation": self.min_correlation,
                            "min_volatility": self.min_volatility,
                            "min_variance": self.min_variance,
                            "max_return": self.max_return,
                            "max_diversification": self.max_diversification,
                            "max_sharpe": self.max_sharpe,
                            "inverse_volatility": self.inverse_volatility,
                            "inverse_variance": self.inverse_variance,
                            "equal_weight": self.equal_weight}

    # Risk related objective functions

    def equal_risk_parity(self, w: np.ndarray) -> float:
        """
        Individual assets contribute equal amounts of risk to the portfolio

        Parameters
        ----------
        w : np.ndarray
            Array of portfolio weights

        Returns
        -------
        float
            Risk parity global value
        """
        return self.risk_parity(w)

    def min_correlation(self, w: np.ndarray) -> float:
        """
        Minimize portfolio correlation factor

        Parameters
        ----------
        w : np.ndarray
            Array of portfolio weights

        Returns
        -------
        float
            Correlation factor global value
        """
        return self.correlation(w)

    def min_volatility(self, w: np.ndarray) -> float:
        """
        Minimize portfolio volatility

        Parameters
        ----------
        w : np.ndarray
            Array of portfolio weights

        Returns
        -------
        float
            Volatility global value
        """
        return self.volatility(w)

    def min_variance(self, w: np.ndarray) -> float:
        """
        Minimize portfolio variance

        Parameters
        ----------
        w : np.ndarray
            Array of portfolio weights

        Returns
        -------
        float
            Variance global value
        """
        return self.variance(w)

    def max_diversification(self, w: np.ndarray) -> float:
        """
        Maximize portfolio diversification

        Parameters
        ----------
        w : np.ndarray
            Array of portfolio weights

        Returns
        -------
        float
            Diversification factor global value
        """
        return -self.diversification(w)

    # Risk-reward related objective functions

    def efficient_frontier(self, w: np.ndarray, aversion: float) -> float:
        """
        Maximize return with lowest variance (quadratic utility)

        Parameters
        ----------
        w : np.ndarray
            Array of portfolio weights
        aversion : float
            Risk aversion parameter

        Returns
        -------
        float
            Quadratic utility global value
        """
        return -(self.expected_return(w) - 0.5 * aversion * self.variance(w))

    def max_return(self, w: np.ndarray) -> float:
        """
        Maximize return regardless of risk

        Parameters
        ----------
        w : np.ndarray
            Array of portfolio weights

        Returns
        -------
        float
            Return global value
        """
        return -self.expected_return(w)

    def max_sharpe(self, w: np.ndarray, rfr: float) -> float:
        """
        Maximize Sharpe ratio

        Parameters
        ----------
        w : np.ndarray
            Array of portfolio weights
        rfr : float
            Risk free rate of return

        Returns
        -------
        float
            Sharpe ratio global value
        """
        return -self.sharpe(w, rfr)

    def create_objective(self, objective_type: str, **kwargs) -> function:
        """
        Function to construct objective function

        Parameters
        ----------
        objective_type : str
            String to specify the type of objective function

        Returns
        -------
        function
            If weight function is not numerical (array of weights) return objective function
        """
        if objective_type in {"equal_weight", "inverse_volatility", "inverse_variance"}:
            return self.method_dict["objective_type"](**kwargs)
        else:
            return self.method_dict[objective_type]


## Define portfolio constraints

In [None]:
class Constraints(Metrics):
    def __init__(self, return_vector:np.ndarray, moment_matrix: np.ndarray, assets: list, moment:int=2):
        # Same parameters as Metrics()
        super().__init__(return_vector, moment_matrix, assets, moment)

        self.method_dict = {"weight": self.weight,
                            "num_assets": self.num_assets_const,
                            "concentration": self.concentration_const,
                            "expected_return": self.expected_return_const,
                            "sharpe": self.sharpe_const,
                            "volatility": self.volatility_const,
                            "variance": self.moment_const}
    
    @staticmethod
    def construct_weight_bound(size:int, init_bound:tuple, weight_bound: np.ndarray):
        #individual_bound = init_bound
        pass

    def weight(self, weight_bound:np.ndarray, leverage: float):
        pass
    
    def create_constraint(self, constraint_type:str, **kwargs):
        pass

## Construct optimizer

## Solve and check summary