In [1]:
import pandas as pd
import numpy as np
import requests
from bs4 import BeautifulSoup
import yfinance as yf
import time
from tqdm import tqdm
import matplotlib.pyplot as plt #not needed to run program
import datetime
import logging
from pypfopt import risk_models, expected_returns, EfficientFrontier, objective_functions
import itertools
import warnings
# import plotly.express as px
# import plotly.graph_objects as go

In [2]:
logging.basicConfig(format = '%(asctime)s:%(levelname)s :%(message)s',
                        datefmt = '%Y-%m-%d %H:%M:%S',
                        filename = 'logs/run_main.log',
                        level=logging.INFO)

**1. Download stock returns**

*1.1 Get tickers*

In [3]:
def get_dax_tickers():
    """
    Retrieves the tickers of companies listed in the DAX index from Wikipedia and returns a list of tickers.

    Returns:
        tickers_dax (list): A list of tickers of companies listed in the DAX index.

    Raises:
        Exception: If the website cannot be reached or does not exist.
        Exception: If the table with id 'constituents' cannot be found in the website.
        Exception: If the 'Ticker' column cannot be extracted from the table.

    """
    url = 'https://en.wikipedia.org/wiki/DAX'

    #Check if the connection is succesful
    try:
        response = requests.get(url, verify=False)
        if response.ok:
            soup = BeautifulSoup(response.text, 'html.parser')
        else:
            raise Exception(f'Cannot reach website {url}')
    except:
        raise Exception(f'Website {url} does not exist')

    #check if table 'constituents' exists
    wiki_table_id = 'constituents'
    try:
        indiatable = soup.find('table',{'id': wiki_table_id,'class':'wikitable'})
        df = pd.read_html(str(indiatable))
    except:
        raise Exception(f'Table {wiki_table_id} cannot be found in {url}')

    #Extract columns from table
    extract_cols = 'Ticker'
    try:
        tickers_dax = pd.DataFrame(df[0])[extract_cols]
        tickers_dax = tickers_dax.tolist()
    except KeyError as e:
        raise Exception((e.args[0]).replace('index', 'column index of table constituents'))

    logging.info(f'DAX40 tickers loaded successfully')

    return tickers_dax

In [4]:
def get_custom_tickers():
    """
    Asks the user to input a list of tickers separated by commas and returns the list of tickers.

    Returns:
        tickers_custom (list): The list of custom tickers entered by the user.

    Raises:
        Exception: If the length of the input data is less than 2.

    """
    while True:
        tickers_custom = input("Enter a list of tickers separated by commas. Use tickers from https://finance.yahoo.com/").strip().split(',')
        tickers_custom = [ticker.strip().upper() for ticker in tickers_custom]  # Convert tickers to uppercase and remove whitespace
        
        if len(tickers_custom) >= 2:
            break
        else:
            print("Invalid input. Please enter at least 2 tickers.")

    return tickers_custom

In [5]:
def ask_user_tickers():
    """
    Asks the user whether to use default values or custom values for the tickers portfolio.

    Returns:
    - tickers_portfolio (list): The tickers portfolio based on the user's choice.

    """
    while True:
        answer = input("Do you want to use DAX40 constituents as default tickers? (yes/no): ").strip().lower()
        if answer == 'yes':
            tickers_portfolio = get_dax_tickers()  # Default values using get_dax_tickers() function
            break
        elif answer == 'no':
            tickers_portfolio = get_custom_tickers()  # Custom values using get_custom_tickers() function
            break
        else:
            print("Invalid answer. Please enter either 'yes' or 'no'.")

    return tickers_portfolio


*1.2 Get date range*

In [6]:
def get_date_input(question):
    """
    Prompts the user to input a date in the format 'YYYY-MM-DD' and returns a datetime object.

    Args:
        question (str): The question to display when prompting for the date input.

    Returns:
        datetime.datetime: A datetime object representing the inputted date.

    """
    while True:
        date_input = input(question)
        try:
            date_obj = datetime.datetime.strptime(date_input, "%Y-%m-%d")
            return date_obj
        except ValueError:
            print("Invalid date format. Please use the format 'YYYY-MM-DD'.")


In [7]:
def ask_custom_date_range():
    """
    Prompts the user to enter a start date and end date in 'YYYY-MM-DD' format and returns the date range.

    Returns:
        tuple: A tuple containing two strings representing the start date and end date in 'YYYY-MM-DD' format.

    """
    while True:
        start_date = get_date_input("Enter start date in 'YYYY-MM-DD' format:")
        end_date = get_date_input("Enter end date in 'YYYY-MM-DD' format:")

        if end_date > start_date:
            start_date = start_date.strftime("%Y-%m-%d")
            end_date = end_date.strftime("%Y-%m-%d")
            return start_date, end_date
        else:
            print("Invalid date range. Start date must be before end date.")


In [8]:
def ask_user_date_range():
    """
    Prompts the user to choose between using a default date range or entering a custom date range.

    Returns:
        tuple: A tuple containing two strings representing the start date and end date in 'YYYY-MM-DD' format.

    Raises:
        ValueError: If the user provides an invalid answer.

    """
    while True:
        answer = input("Do you want to use the default date range? (yes/no): ").strip().lower()
        if answer == 'yes':
            start_date, end_date = ("2013-01-01", "2023-01-01")
            break
        elif answer == 'no':
            start_date, end_date = ask_custom_date_range()
            break
        else:
            print("Invalid answer. Please enter either 'yes' or 'no'.")

    return start_date, end_date


*1.3 Get yahoo finance data*

In [9]:
def calculate_stock_returns(prices):
    """
    Calculates the series of returns from a series of prices.

    Args:
        prices (pd.Series): A pandas Series containing the prices.

    Returns:
        returns (pd.Series): A pandas Series containing the calculated returns.

    """
    try:
        returns = prices.pct_change().dropna()
    except:
        raise Exception("Error while calculating returns")
    return returns


In [10]:
def convert_datetime_index_to_date(pandas_series):
    """
    Converts the index of a pandas Series object from datetime to date.

    Args:
        pandas_series (pandas.Series): The pandas Series object with a datetime index.

    Returns:
        pandas.Series: The pandas Series object with the index converted to date.

    Raises:
        Exception: If the index of the pandas Series cannot be converted to date.

    """
    try:
        pandas_series.index = pd.to_datetime(pandas_series.index.date)
    except AttributeError as e:
        raise Exception(f'Could not convert index of a dataframe to date. Error message: {e}')

    return pandas_series


In [11]:
def get_single_stock_returns(ticker, start_date, end_date):
    """
    Retrieves the monthly returns of a single stock for a specified time period.

    Args:
        ticker (str): The ticker symbol of the stock to retrieve data for.
        start_date (str): The start date of the time period in "YYYY-MM-DD" format.
        end_date (str): The end date of the time period in "YYYY-MM-DD" format.

    Returns:
        pandas.Series: A pandas Series object containing the monthly returns of the stock.

    Raises:
        ValueError: If the stock either does not exist or does not have data for the given time period.

    """
    stock_data = yf.Ticker(ticker)
    stock_price = stock_data.history(start=start_date, end=end_date, interval='1mo').Close

    if stock_price.empty:
        raise ValueError(f'{ticker} either does not exist or does not have data for a given time period')

    stock_returns = calculate_stock_returns(stock_price)
    stock_returns = convert_datetime_index_to_date(stock_returns)

    return stock_returns


In [12]:
def get_stock_info(ticker):
    """
    Retrieves information about a stock given its ticker symbol.

    Args:
        ticker (str): Ticker symbol of the stock.

    Returns:
        stock_name (str): Name of the stock.
        stock_country (str): Country of the stock.
        stock_sector (str): Sector of the stock.

    Raises:
        ValueError: If an error occurs while retrieving the stock information.

    """
    try:
        stock_info = yf.Ticker(ticker).info
        stock_name = stock_info['longName'].lower().capitalize()
        stock_country = stock_info['country'].lower().capitalize()
        stock_sector = stock_info['sector'].lower().capitalize()
        return stock_name, stock_country, stock_sector

    except Exception as e:
        raise ValueError(f"An error occurred while retrieving stock information for {ticker}: {str(e)}")


In [13]:
def get_batch_stock_data(tickers, start_date, end_date):
    """
    Retrieves batch stock data for multiple tickers, including stock names, countries, sectors, and monthly returns.

    Args:
        tickers (list): A list of ticker symbols for the stocks to retrieve data for.
        start_date (str): The start date of the time period in "YYYY-MM-DD" format.
        end_date (str): The end date of the time period in "YYYY-MM-DD" format.

    Returns:
        tuple: A tuple containing two elements:
            - A dictionary (stock_data) with ticker symbols as keys and dictionaries as values, containing the following information:
                - 'Name': The name of the stock.
                - 'Country': The country the stock belongs to.
                - 'Sector': The sector the stock belongs to.
                - 'Monthly_prices': A pandas Series object containing the monthly returns of the stock.
            - A list (tickers_fail) containing ticker symbols that either do not exist or do not have data for the given time period.
    """
    stock_data = {}
    tickers_success = []
    tickers_fail = []

    with tqdm(total=len(tickers)) as pbar:
        for single_ticker in tickers:
            pbar.set_postfix_str(single_ticker)
            try:
                stock_name, stock_country, stock_sector = get_stock_info(single_ticker)
                stock_returns = get_single_stock_returns(single_ticker, start_date=start_date, end_date=end_date)
                stock_data[single_ticker] = {'Name': stock_name, 'Country': stock_country, 
                                             'Sector': stock_sector, 'Monthly_prices': stock_returns
                                             }
                tickers_success.append(single_ticker)
            except ValueError:
                tickers_fail.append(single_ticker)
                pass
            pbar.update()
            time.sleep(0.5)

    logging.info(f'Successfully downloaded stocks: {tickers_success}')
    logging.warning(f'Non-existent tickers or no data for given time interval: {tickers_fail}')

    return stock_data, tickers_fail


*1.4 Final data retrieval*

In [14]:
def full_stock_data_retrieval():
    """
    Retrieves full stock data for a user-defined portfolio within a specified date range.

    Returns:
        tuple: A tuple containing two elements:
            - A dictionary (stock_data) with ticker symbols as keys and dictionaries as values, containing the following information:
                - 'Name': The name of the stock.
                - 'Country': The country the stock belongs to.
                - 'Sector': The sector the stock belongs to.
                - 'Monthly_prices': A pandas Series object containing the monthly returns of the stock.
            - A list (tickers_failed) containing ticker symbols that either do not exist or do not have data for the given time period.

    """
    tickers_portfolio = ask_user_tickers()
    stock_start_date, stock_end_date = ask_user_date_range()
    stock_data, tickers_failed = get_batch_stock_data(tickers_portfolio, start_date=stock_start_date, end_date=stock_end_date)
    return stock_data, tickers_failed


In [15]:
def ask_full_stock_data_retrieval():
    """
    Prompts the user to retrieve full stock data for a user-defined portfolio within a specified date range.

    Returns:
        dict: A dictionary containing retrieved stock data with ticker symbols as keys and dictionaries as values, containing the following information:
            - 'Name': The name of the stock.
            - 'Country': The country the stock belongs to.
            - 'Sector': The sector the stock belongs to.
            - 'Monthly_prices': A pandas Series object containing the monthly returns of the stock.

    """
    while True:
        stock_data, tickers_failed = full_stock_data_retrieval()
        
        if len(tickers_failed) == 0:
            break

        else:
            answer = input(f"Tickers {tickers_failed} either do not exist or they have no data for the given time range. Would you like to provide the ticker list and date range again? (yes/no): ")
            
            if answer == 'yes':
                print("Enter the required data again.")
            elif answer == 'no':
                break
            else:
                print("Invalid answer. Please enter either 'yes' or 'no'.")

    return stock_data


In [34]:
stock_data = ask_full_stock_data_retrieval()

100%|██████████| 8/8 [00:08<00:00,  1.08s/it, ZAL.DE]


In [35]:
print(stock_data)

{'META': {'Name': 'Meta platforms, inc.', 'Country': 'United states', 'Sector': 'Communication services', 'Monthly_prices': 2013-02-01   -0.120400
2013-03-01   -0.061284
2013-04-01    0.085614
2013-05-01   -0.123154
2013-06-01    0.021766
                ...   
2022-08-01    0.024073
2022-09-01   -0.167250
2022-10-01   -0.313384
2022-11-01    0.267711
2022-12-01    0.018967
Name: Close, Length: 119, dtype: float64}, 'TSLA': {'Name': 'Tesla, inc.', 'Country': 'United states', 'Sector': 'Consumer cyclical', 'Monthly_prices': 2013-02-01   -0.071448
2013-03-01    0.087855
2013-04-01    0.424914
2013-05-01    0.810706
2013-06-01    0.098200
                ...   
2022-08-01   -0.072489
2022-09-01   -0.037589
2022-10-01   -0.142168
2022-11-01   -0.144326
2022-12-01   -0.367334
Name: Close, Length: 119, dtype: float64}, 'GOOG': {'Name': 'Alphabet inc.', 'Country': 'United states', 'Sector': 'Communication services', 'Monthly_prices': 2013-02-01    0.060223
2013-03-01   -0.008749
2013-04-01   

**3. Miniumum variance portfolio** <br>
*Machine learning approach - tuning hyperparameters to achieve portfolio with least variance*

*3.1. Get strategy and hyperparameter candidates for covariance, mean and penalty from user*

In [23]:
def get_portfolio_strategy():
    """
    Prompts the user to specify a portfolio strategy.

    Valid options are:
    - 1: Minimum Variance
    - 2: Maximum Sharpe Ratio

    Returns:
    - If the user enters '1', returns "minimum variance".
    - If the user enters '2', returns "maximum sharpe".
    - If the user enters any other value, displays an error message and asks for input again.
    """
    while True:
        strategy = input("Specify your portfolio strategy (1 - Minimum Variance, 2 - Maximum Sharpe Ratio): ")
        
        if strategy == '1':
            return "minimum variance"
        elif strategy == '2':
            return "maximum sharpe"
        else:
            print("Invalid input. Please enter either 1 or 2.")


In [25]:
def get_covariance_estimates():
    """
    Prompts the user to enter one or more integer values from 1 to 8, corresponding to different covariance estimates.

    The valid integer values and their corresponding estimates are:
    - 1: sample_cov
    - 2: semicovariance
    - 3: exp_cov
    - 4: ledoit_wolf
    - 5: ledoit_wolf_constant_variance
    - 6: ledoit_wolf_single_factor
    - 7: ledoit_wolf_constant_correlation
    - 8: oracle_approximating

    Returns:
    - A list of the selected covariance estimate values.
    """
    cov_est_encoding = {
        1: "sample_cov",
        2: "semicovariance",
        3: "exp_cov",
        4: "ledoit_wolf",
        5: "ledoit_wolf_constant_variance",
        6: "ledoit_wolf_single_factor",
        7: "ledoit_wolf_constant_correlation",
        8: "oracle_approximating"
    }
        
    while True:
        values = input(f"Please enter one or more integer values from 1 to 8, separated by commas. Values correspond to following esimates: {cov_est_encoding}").strip().split(',')

        valid_values = []
        invalid_values = []
        
        for value in values:
            if int(value) in cov_est_encoding.keys():
                valid_values.append(cov_est_encoding[int(value)])
            else:
                invalid_values.append(value)
        
        if invalid_values:
            print(f"Invalid input: {invalid_values} Please enter only integ9er values from 1 to 8.")
        else:
            break
    
    return valid_values

In [52]:
def get_covest_candidates():
    """
    Prompts the user to decide whether to search for the best covariance estimate or not.

    Returns:
    - If the user enters 'no', returns "mean_historical_return".
    - If the user enters 'yes', prompts for covariance estimate values using the `get_covariance_estimates()` function,
      and returns the selected covariance estimate values.
    """    
    while True:
        answer = input("Do you want to search for best covariance estimate? (yes/no):").strip().lower()
        if answer == 'no':
            cov_est_candidate = ["sample_cov"]
            break
        elif answer == 'yes':
            cov_est_candidate = get_covariance_estimates()
            break
        else:
            print("Invalid answer. Please enter either 'yes' or 'no'.")

    return cov_est_candidate

In [47]:
def get_mean_estimates():
    """
    Prompts the user to enter one or more integer values from 1 to 3, corresponding to different mean estimates.

    The valid integer values and their corresponding estimates are:
    - 1: mean_historical_return
    - 2: ema_historical_return
    - 3: capm_return

    Returns:
    - A list of the selected mean estimate values.
    """    
    mu_est_encoding = {
        1: "mean_historical_return",
        2: "ema_historical_return",
        3: "capm_return"
    }
        
    while True:
        values = input(f"Please enter one or more integer values from 1 to 3, separated by commas. Values correspond to following esimates: {mu_est_encoding} ").strip().split(',')

        valid_values = []
        invalid_values = []
        
        for value in values:
            if int(value) in mu_est_encoding.keys():
                valid_values.append(mu_est_encoding[int(value)])
            else:
                invalid_values.append(value)
        
        if invalid_values:
            print(f"Invalid input: {invalid_values} Please enter only integer values from 1 to 8.")
        else:
            break
    
    return valid_values

In [53]:
def get_muest_candidates():
    """
    Prompts the user to decide whether to search for the best mean estimate or not.

    Returns:
    - If the user enters 'no', returns "mean_historical_return".
    - If the user enters 'yes', prompts for mean estimate values using the `get_mean_estimates()` function,
      and returns the selected mean estimate values.
    """
    while True:
        answer = input("Do you want to search for best mean estimate? (yes/no):").strip().lower()
        if answer == 'no':
            mu_est_candidate = ["mean_historical_return"]
            break
        elif answer == 'yes':
            mu_est_candidate = get_mean_estimates()
            break
        else:
            print("Invalid answer. Please enter either 'yes' or 'no'.")

    return mu_est_candidate

In [31]:
def get_shrinkage_estimates():
    """
    Prompts the user to enter one or more shrinkage values in double format, separated by commas.

    The valid shrinkage values are expected to be in the range of 0 to 2.

    Returns:
    - A list of the valid shrinkage values entered by the user.
    """
    while True:
        values = input("Enter shrinkage values in double format, separated by commas. Adivce to use values between 0 and 2").strip().split(',')

        valid_values = []
        invalid_values = []

        for value in values:
            try:
                double_value = float(value)
                valid_values.append(double_value)
            except ValueError:
                invalid_values.append(value)

        if invalid_values:
            print(f"Invalid input: {invalid_values}. Please enter only double values.")
        elif valid_values:
            break
        else:
            print("No valid input values provided. Please try again.")

    return valid_values


In [55]:
def get_shrinkage_candidates():
    """
    Prompts the user to decide whether to search for the best shrinkage estimate or not.

    Returns:
    - If the user enters 'no', returns 0.
    - If the user enters 'yes', prompts for shrinkage values using the `get_shrinkage_estimates()` function,
      and returns the selected shrinkage values.
    """
    while True:
        answer = input("Do you want to search for best shrinkage estimate? (yes/no):").strip().lower()
        if answer == 'no':
            shrinkage_est_candidate = [0]
            break
        elif answer == 'yes':
            shrinkage_est_candidate = get_shrinkage_estimates()
            break
        else:
            print("Invalid answer. Please enter either 'yes' or 'no'.")

    return shrinkage_est_candidate

*3.2. Portfolio optimization*

In [20]:
def stock_returns_to_df(stock_data_dict):
    """
    Converts a dictionary of stock data with monthly returns to a DataFrame.

    Args:
        stock_data_dict (dict): A dictionary containing stock data with monthly returns, where the keys represent stock tickers and the values are dictionaries containing information about each stock, including the 'Monthly_prices' Series.

    Returns:
        pandas.DataFrame: A DataFrame where the row index represents the dates stored in the 'Monthly_prices' Series, and the columns represent the stock tickers.

    Raises:
        Exception: If an error occurs during the conversion to DataFrame.

    """
    try:
        stock_returns_table = pd.DataFrame({ticker: data['Monthly_prices'] for ticker, data in stock_data_dict.items()})
        return stock_returns_table
    except Exception as e:
        raise(e)


In [60]:
def port_name(cov_est, mu_est, penalty):
    """
    Generates a name for a portfolio based on the given covariance estimation method, mean estimation method, and penalty value.

    Args:
        cov_est (str): The covariance estimation method.
        mu_est (str): The mean estimation method.
        penalty (float): The penalty value.

    Returns:
        str: The generated portfolio name.

    Raises:
        KeyError: If the provided covariance or mean estimation method is not implemented.
    """
    cov_est_encoding = {
        "sample_cov": 'sample',
        "semicovariance": 'semi',
        "exp_cov": 'exp',
        "ledoit_wolf": 'lw',
        "ledoit_wolf_constant_variance": 'lwcv',
        "ledoit_wolf_single_factor": 'lwsf',
        "ledoit_wolf_constant_correlation": 'lwcc',
        "oracle_approximating": 'oa'
    }

    mu_est_encoding = {
        "mean_historical_return": 'mean',
        "ema_historical_return": 'ema',
        "capm_return": 'capm'
    }

    mvp_name_template = 'port_{cov_est_name}_{mu_est_name}_{penalty_value}'

    try:
        port_name_filled = mvp_name_template.format(
            cov_est_name=cov_est_encoding[cov_est],
            mu_est_name=mu_est_encoding[mu_est],
            penalty_value=penalty
        )
        return port_name_filled

    except KeyError as k:
        raise KeyError(f'{k} is not implemented')


In [61]:
def mvp_weights(dataframe_returns, cov_est, mu_est, penalty):
    """
    Computes the minimum variance portfolio weights.

    Parameters:
        dataframe_returns (pandas.DataFrame): DataFrame containing asset returns.
        cov_est (str): Method to estimate the covariance matrix.
        mu_est (str): Method to estimate expected returns.
        penalty (float): L2 regularization penalty.

    Returns:
        dict or None: Dictionary of asset weights for the minimum variance portfolio or None if an error occurs.

    Raises:
        Exception: If an error occurs during the computation, an exception is raised with an error message.
    """
    try:
        mu = expected_returns.return_model(prices=dataframe_returns, returns_data=True, frequency=12, method=mu_est)  # estimates for expected returns
        cov_mat = risk_models.risk_matrix(prices=dataframe_returns, returns_data=True, frequency=12, method=cov_est)  # estimates for covariance matrix
        cov_mat_fix = risk_models.fix_nonpositive_semidefinite(matrix=cov_mat)  # fix matrix if it's non-positive semidefinite

        ef = EfficientFrontier(expected_returns=mu, cov_matrix=cov_mat_fix)
        ef.add_objective(objective_functions.L2_reg, gamma=penalty)
        ef.min_volatility()
        weights = ef.clean_weights()
        return weights
    
    except Exception as e:
        raise Exception(f'Error message: {e}')

In [62]:
def msr_weights(dataframe_returns, cov_est, mu_est, penalty):
    """
    Computes the maximum sharpe ratio portfolio weights.

    Parameters:
        dataframe_returns (pandas.DataFrame): DataFrame containing asset returns.
        cov_est (str): Method to estimate the covariance matrix.
        mu_est (str): Method to estimate expected returns.
        penalty (float, optional): L2 regularization penalty.

    Returns:
        dict or None: Dictionary of asset weights for the maximum sharpe ratio portfolio or None if an error occurs.

    Raises:
        Exception: If an error occurs during the computation, an exception is raised with an error message.
    """
    try:
        mu = expected_returns.return_model(prices=dataframe_returns, returns_data=True, frequency=12, method=mu_est)  # estimates for expected returns
        cov_mat = risk_models.risk_matrix(prices=dataframe_returns, returns_data=True, frequency=12, method=cov_est)  # estimates for covariance matrix
        cov_mat_fix = risk_models.fix_nonpositive_semidefinite(matrix=cov_mat)  # fix matrix if it's non-positive semidefinite

        ef = EfficientFrontier(expected_returns=mu, cov_matrix=cov_mat_fix)
        ef.add_objective(objective_functions.L2_reg, gamma=penalty)
        ef.max_sharpe(risk_free_rate=0)
        weights = ef.clean_weights()
        return weights
    
    except Exception as e:
        raise Exception(f'Error message: {e}')

In [63]:
def calculate_portfolio_weights(strategy, dataframe_returns, cov_est, mu_est, penalty):
    """
    Calculates portfolio weights based on the selected strategy.

    Args:
        strategy (str): The selected strategy. Can be either "minimum variance" or "maximum sharpe".
        dataframe_returns (pandas.DataFrame): A DataFrame containing historical returns data.
        cov_est (function): A function for estimating the covariance matrix of returns.
        mu_est (function): A function for estimating the expected returns.
        penalty (float): A penalty parameter for the selected strategy.

    Returns:
        list: A list of portfolio weights.

    Raises:
        ValueError: If an invalid strategy is provided.

    """
    if strategy == "minimum variance":
        return mvp_weights(dataframe_returns=dataframe_returns, cov_est=cov_est, mu_est=mu_est, penalty=penalty)
    elif strategy == "maximum sharpe":
        return msr_weights(dataframe_returns=dataframe_returns, cov_est=cov_est, mu_est=mu_est, penalty=penalty)
    else:
        raise ValueError("Invalid strategy. Please choose either 'minimum variance' or 'maximum sharpe'.")

In [64]:
def prepare_returns_is_oos(df_returns, index_in_sample, index_out_of_sample):
    """
    Prepares in-sample and out-of-sample stock returns DataFrames by removing columns with NaN values.

    Args:
        df_returns (pandas.DataFrame): The DataFrame containing stock returns.
        index_in_sample (list-like): The indices corresponding to the in-sample data.
        index_out_of_sample (list-like): The indices corresponding to the out-of-sample data.

    Returns:
        tuple: A tuple containing two DataFrames:
            - stock_returns_is_corrected: The in-sample stock returns DataFrame with NaN columns removed.
            - stock_returns_oos_corrected: The out-of-sample stock returns DataFrame with NaN columns removed.

    """
    try:
        stock_returns_is = df_returns.iloc[index_in_sample,]
        stock_returns_oos = df_returns.iloc[index_out_of_sample,]

        col_contain_na = stock_returns_is.columns[stock_returns_is.isna().any()]

        stock_returns_is_corrected = stock_returns_is.drop(col_contain_na, axis=1)
        stock_returns_oos_corrected = stock_returns_oos.drop(col_contain_na)

        return stock_returns_is_corrected, stock_returns_oos_corrected

    except Exception as e:
        raise(e)


In [65]:
def calculate_returns(portfolio_weights, stock_returns):
    """
    Calculate the portfolio return.

    Args:
        portfolio_weights (list or array-like): The weights of the assets in the portfolio.
        stock_returns (pandas.Series): The returns of the stocks in the portfolio.

    Returns:
        float or None: The calculated portfolio return. If an error occurs during calculation,
                      None is returned.

    """
    try:
        np_weights = pd.Series(portfolio_weights).sort_index().to_numpy()
        np_returns = stock_returns.sort_index().to_numpy()
        portfolio_return = np.dot(np_weights, np_returns)
        return portfolio_return
    except (AttributeError, KeyError, TypeError) as e:
        print(f"An error occurred while calculating portfolio returns: {e}")
        return None

In [101]:
def portfolio_oos_returns(strategy, dataframe_returns, is_period, cov_est, mu_est, penalty):
    """
    Calculates the out-of-sample portfolio returns based on the given historical returns data and parameters.

    Args:
        dataframe_returns (pandas.DataFrame): Historical returns data as a DataFrame.
        is_period (int): Length of the in-sample period.
        cov_est (str): Covariance estimation method.
        mu_est (str): Mean estimation method.
        penalty (float): Penalty value.

    Returns:
        pandas.Series: Out-of-sample portfolio returns.

    Raises:
        KeyError: If the provided covariance or mean estimation method is not implemented.
    """
    portfolio_name = port_name(cov_est=cov_est, mu_est=mu_est, penalty=penalty)

    n = dataframe_returns.shape[0]  # number of observations - 260
    n_is = is_period  # length of in-sample period - 24
    n_oos = n - n_is # length of out-of-sample period - 236

    date_oos = dataframe_returns.iloc[n_is:n].index
    port_oos_returns = []

    for i in range(n_oos):
            
        index_is = list(range(i, i + n_is))
        index_oos = n_is + i

        stock_returns_is, stock_returns_oos = prepare_returns_is_oos(df_returns = dataframe_returns, index_in_sample = index_is, index_out_of_sample = n_oos)

        #if error in calculating weights (due to problems with covariance matrix), assign zero as returns
        try:
            port_weights = calculate_portfolio_weights(strategy=strategy, dataframe_returns=stock_returns_is, cov_est=cov_est, mu_est=mu_est, penalty=penalty)
            port_return = calculate_returns(portfolio_weights=port_weights, stock_returns=stock_returns_oos)
            port_oos_returns.append(port_return)
        except:
            port_return = calculate_returns(portfolio_weights=port_weights, stock_returns=stock_returns_oos)
            port_oos_returns.append(port_return)
            
    port_oos_returns = pd.Series(data=port_oos_returns, index=date_oos, name=portfolio_name)

    return port_oos_returns


In [76]:
def annual_expected_returns(monthly_returns):
    """
    Calculate the annual expected returns from monthly returns.

    Parameters:
        monthly_returns (array-like): A sequence of monthly returns.

    Returns:
        float or None: The annual returns calculated from the mean of monthly returns, rounded to four decimal places.
                       Returns None if an error occurs during the calculation.

    Raises:
        None.
    """
    try:
        return_pa = monthly_returns.mean() * 12
        return_pa_round = np.round(return_pa, 4)
        return return_pa_round
    except Exception as e:
        print(f"An error occurred: {e}")
        return None


In [77]:
def annual_volatility(monthly_returns):

    """
    Calculate the annual volatility from monthly returns returns.

    Parameters:
        monthly_returns (array-like): A sequence of monthly stock returns.

    Returns:
        float or None: The annual volatility of the stock returns, rounded to four decimal places.
                       Returns None if an error occurs during the calculation.

    Raises:
        ValueError: If the length of monthly_returns is less than 2.
    """

    if len(monthly_returns) < 2:
            raise ValueError("At least two stock returns are required to calculate volatility.")    
    try:
        volatility_pa = monthly_returns.std()*np.sqrt(12)
        volatility_pa_round = np.round(volatility_pa, 4)
        return volatility_pa_round
    except Exception as e:
        print(f"An error occurred: {e}")
        return None


In [88]:
def annual_sharpe_ratio(annual_expected_returns, annual_volatility):
    """
    Calculates the annual Sharpe ratio.

    Parameters:
        annual_expected_returns (float): The annual expected returns.
        annual_volatility (float): The annual volatility.

    Returns:
        float: The rounded annual Sharpe ratio calculated as annual_expected_returns/annual_volatility.
    """
    try:
        sr_pa = (annual_expected_returns / annual_volatility)
        sr_pa_round = np.round(sr_pa, 4)
        return sr_pa_round
    except Exception as e:
        print(f"An error occurred: {e}")
        return None


In [90]:
def tune_portfolio(data, strategy, is_period=24, parameter_values = {'cov_est':["sample_cov"], 'mu_est':['mean_historical_return'], 'penalty':[0]}):
    """
    Tune minimum variance portfolio based on different parameter combinations.

    Parameters:
        dataframe_returns (DataFrame): Historical returns data.
        is_period (int): Number of periods.
        param_values (dict): Dictionary of parameter values.

    Returns:
        DataFrame: Results of the tuned portfolios.

    """
    df_data = stock_returns_to_df(data)

    estimation_progress_template = 'Covariance matrix: {cov_est} | Expected retuns: {mu_est} | Penalty value: {penalty}'  # holds info on calculated portfolio
    parameter_combinations = list(itertools.product(*parameter_values.values()))
    output = {}

    with warnings.catch_warnings():
        warnings.filterwarnings("ignore")

        with tqdm(total = len(parameter_combinations)) as pbar:
            
            for parameters in parameter_combinations:
                parameters_selected = dict(zip(parameter_values.keys(), parameters))
                estimation_progress = estimation_progress_template.format(cov_est = parameters_selected['cov_est'],
                                                    mu_est = parameters_selected['mu_est'],
                                                    penalty = parameters_selected['penalty'])
                
                pbar.set_postfix_str(estimation_progress)
                
                try:
                    port_returns = portfolio_oos_returns(strategy=strategy, dataframe_returns=df_data, is_period=is_period, **parameters_selected)
                    output[port_returns.name] = {"Strategy": strategy, "Covariance estimator": parameters_selected['cov_est'],
                                                 "Mean estimator": parameters_selected['mu_est'], "Penalty": parameters_selected['penalty'],
                                                 "Expected returns p.a.": annual_expected_returns(port_returns),
                                                 "Volatility p.a.": annual_volatility(port_returns),
                                                 "Sharpe ratio p.a.": annual_sharpe_ratio(annual_expected_returns=annual_expected_returns(port_returns), annual_volatility=annual_volatility(port_returns))}
                except Exception as e:
                    print(f"Error occurred for hyperparameters: {parameters_selected}. Error: {str(e)}")
                
                pbar.update()

    return output

*3.3. Get it all together*

In [97]:
def hyperparameter_portfolio_optimization(data):

    selected_strategy = get_portfolio_strategy()
    selected_covest = get_covest_candidates()
    selected_muest = get_muest_candidates()
    selected_shrinkage = get_shrinkage_candidates()

    hyperparameter_values = {'cov_est': selected_covest, 'mu_est': selected_muest,'penalty': selected_shrinkage}

    portfolio_data = tune_portfolio(data=data, strategy=selected_strategy, is_period = 24, parameter_values=hyperparameter_values)
    output = pd.DataFrame(portfolio_data).T

    return output
    

In [102]:
output = hyperparameter_portfolio_optimization(data = stock_data)

Invalid input. Please enter either 1 or 2.
Invalid answer. Please enter either 'yes' or 'no'.
Invalid answer. Please enter either 'yes' or 'no'.


100%|██████████| 18/18 [00:49<00:00,  2.74s/it, Covariance matrix: ledoit_wolf | Expected retuns: ema_historical_return | Penalty value: 0.2]   


In [103]:
output

Unnamed: 0,Strategy,Covariance estimator,Mean estimator,Penalty,Expected returns p.a.,Volatility p.a.,Sharpe ratio p.a.
port_sample_mean_0.0,maximum sharpe,sample_cov,mean_historical_return,0.0,0.124,0.1617,0.7669
port_sample_mean_0.1,maximum sharpe,sample_cov,mean_historical_return,0.1,0.2217,0.1515,1.4634
port_sample_mean_0.2,maximum sharpe,sample_cov,mean_historical_return,0.2,0.2562,0.1458,1.7572
port_sample_ema_0.0,maximum sharpe,sample_cov,ema_historical_return,0.0,0.2244,0.1802,1.2453
port_sample_ema_0.1,maximum sharpe,sample_cov,ema_historical_return,0.1,0.3134,0.1686,1.8588
port_sample_ema_0.2,maximum sharpe,sample_cov,ema_historical_return,0.2,0.3475,0.1601,2.1705
port_semi_mean_0.0,maximum sharpe,semicovariance,mean_historical_return,0.0,0.2205,0.2138,1.0313
port_semi_mean_0.1,maximum sharpe,semicovariance,mean_historical_return,0.1,0.3186,0.1631,1.9534
port_semi_mean_0.2,maximum sharpe,semicovariance,mean_historical_return,0.2,0.3086,0.1444,2.1371
port_semi_ema_0.0,maximum sharpe,semicovariance,ema_historical_return,0.0,0.29,0.2272,1.2764


**4. Plot results**

In [33]:
mvp_candidate_metrics = risk_metrics(monthly_returns=mvp_candidate_returns)

In [34]:
print(mvp_candidate_metrics)

                    name  expected_returns  volatility
0      mvp_sample_mean_0            0.1150      0.1485
1   mvp_sample_mean_0.25            0.1252      0.1506
2    mvp_sample_mean_0.5            0.1279      0.1570
3        mvp_semi_mean_0            0.1042      0.1594
4     mvp_semi_mean_0.25            0.1266      0.1532
5      mvp_semi_mean_0.5            0.1269      0.1605
6          mvp_lw_mean_0            0.1243      0.1433
7       mvp_lw_mean_0.25            0.1305      0.1548
8        mvp_lw_mean_0.5            0.1293      0.1620
9          mvp_oa_mean_0            0.1227      0.1436
10      mvp_oa_mean_0.25            0.1262      0.1547
11       mvp_oa_mean_0.5            0.1287      0.1610


In [35]:
# plot it

In [36]:
mvp_best = mvp_candidate_metrics[mvp_candidate_metrics.volatility == mvp_candidate_metrics.volatility.min()]

In [37]:
# # #portfolio with least volatility
# mvp_best = df_mvp[df_mvp.volatility == df_mvp.volatility.min()]

In [38]:
dax_metrics = risk_metrics(monthly_returns=dax_monthly_returns.loc[mvp_candidate_returns.index,])

In [39]:
dax_constituents_metrics = dax_metrics.merge(dax_constituents_info, how = "left", left_on = "name", right_on = "Ticker")

In [40]:
print(dax_constituents_metrics.head(5))

      name  expected_returns  volatility   Ticker     Name  \
0   ADS.DE            0.1466      0.2640   ADS.DE   Adidas   
1   AIR.DE            0.2156      0.3476   AIR.DE   Airbus   
2   ALV.DE            0.1399      0.2787   ALV.DE  Allianz   
3   BAS.DE            0.1224      0.2595   BAS.DE     BASF   
4  BAYN.DE            0.1208      0.2562  BAYN.DE    Bayer   

                Sector  
0                Other  
1  Aerospace & Defence  
2   Financial Services  
3            Chemicals  
4      Pharmaceuticals  


In [41]:
fig = px.scatter(
    dax_constituents_metrics, 
    x = "volatility", 
    y = "expected_returns", 
    color = "Sector",
    color_discrete_sequence=px.colors.qualitative.Set1,
    labels={"expected_returns": "Expected returns p.a.", "volatility": "Volatility p.a."},
    hover_name = "Name",
    hover_data={"expected_returns":True, "volatility":True,"Sector":False})
fig.update_layout(
    title_text="Out-of-sample Risk-return matrix", 
    title_x=0.5, 
    font={'size': 15}, 
    hoverlabel = {"font_size": 15}, 
    width=1200, 
    height=700)
fig.add_trace(
    go.Scatter(
        x=mvp_best.volatility, 
        y=mvp_best.expected_returns,
        mode='markers',
        marker_symbol="star",
        name='Minimum Variance Portfolio',
        marker=dict(line=dict(color="black", width=3)),
        hovertemplate='<b>Minimum variance portfolio</b> <br><br>Volatility p.a.=%{x} <br>Expected returns p.a.=%{y}'
        ))
fig.update_traces(marker_size=30)
fig.show()

#code should be structured (put it in different folder), 