In [7]:
import pandas as pd
import numpy as np
import yfinance as yf
import matplotlib.pyplot as plt
import mplfinance as mpf
import os
from pypfopt.efficient_frontier import EfficientFrontier
import pypfopt.base_optimizer
from pypfopt.expected_returns import mean_historical_return
from pypfopt.risk_models import sample_cov

In [42]:
pd.options.mode.chained_assignment = None
if 'mag7.csv' not in os.listdir():
    mag7_tickers = ['MSFT', 'AAPL', 'NVDA', 'AMZN', 'GOOG', 'META', 'TSLA']
    mag7 = yf.download(mag7_tickers, start='2013-01-01', end='2023-12-31')[['Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume']]
    mag7.index = pd.to_datetime(mag7.index)
    mag7 = mag7.swaplevel(0, 1, axis=1).sort_index(axis=1)
    mag7.to_csv('mag7.csv')
else:
    mag7 = pd.read_csv('mag7.csv', header=[0, 1], index_col=0, parse_dates=True)


class PortfolioOptimization:
    def __init__(self, data_df: pd.DataFrame, initial_capital: float):
        self.data_df = data_df
        self.initial_capital = initial_capital
    
    def optimisation(self) -> pd.DataFrame:
        data = self.data_df.swaplevel(0, 1, axis=1).sort_index(axis=1)['Close']
        quarterly_data = data.resample('Q').last()

        allocations = []

        for i in range(1, len(quarterly_data)):
            quarter_data = data.loc[quarterly_data.index[i-1]:quarterly_data.index[i]]

            if quarter_data.isnull().values.any():
                print(f'Missing data in {quarterly_data.index[i-1]} to {quarterly_data.index[i]} iteration, dropping columns with NaNs.')
            quarter_data.dropna(axis=1, inplace=True)
            
            if quarter_data.empty:
                print(f"Skipping optimisation for {quarterly_data.index[i]} due to empty quarter data after dropping NaNs.")
                allocations.append({ticker: 0 for ticker in data.columns})
                continue

            mu = mean_historical_return(quarter_data, frequency=4)
            S = sample_cov(quarter_data, frequency=4)

            ef = EfficientFrontier(mu, S)
            try:
                weights = ef.max_sharpe()
                cleaned_weights = ef.clean_weights()
            except ValueError as e:
                print(f"Optimisation failed for {quarterly_data.index[i]}: {e}. Proceeding with equal weights.")
                n_assets = len(quarter_data.columns)
                equal_weight = 1 / n_assets
                cleaned_weights = {ticker: equal_weight for ticker in quarter_data.columns}
            
            allocations.append(cleaned_weights)
        
        # Create DataFrame for allocations
        allocations_df = pd.DataFrame(allocations, index=quarterly_data.index[1:])
        return allocations_df

class TradingStrategy:
    """
    First use Double Bollinger Band indicator to determine the trend of the stock
    Next, use MACD to determine the entry and exit points based off on the trend
    """
    def __init__(self, ticker_data: pd.DataFrame, initial_capital: float):
        self.ticker_data = ticker_data
        self.initial_capital = initial_capital
    
    def backtest(self):
        if not isinstance(self.initial_capital, (int, float)):
            return pd.Series(0).set_index(self.ticker_data.index)
        else:
            data = self.ticker_data.copy()

            data['rolling_mean'] = data['Close'].rolling(window=20).mean()
            data['rolling_std'] = data['Close'].rolling(window=20).std()
            data['A1'] = data['rolling_mean'] + (data['rolling_std'] * 2)
            # data['A1'] = data['A1'].shift(1)
            data['A2'] = data['rolling_mean'] - (data['rolling_std'] * 2)
            # data['A2'] = data['A2'].shift(1)
            data['B1'] = data['rolling_mean'] + (data['rolling_std'] * 1)
            # data['B1'] = data['B1'].shift(1)
            data['B2'] = data['rolling_mean'] - (data['rolling_std'] * 1)
            # data['B2'] = data['B2'].shift(1)

            data['26MA'] = data['Close'].ewm(span=26, adjust=False).mean()
            # data['26MA'] = data['26MA'].shift(1)
            data['12MA'] = data['Close'].ewm(span=12, adjust=False).mean()
            # data['12MA'] = data['12MA'].shift(1)
            data['MACD'] = data['12MA'] - data['26MA']
            # data['MACD'] = data['MACD'].shift(1)
            data['Signal'] = data['MACD'].ewm(span=9, adjust=False).mean()
            # data['Signal'] = data['Signal'].shift(1)


            buy_price = []
            sell_price = []
            dbb_macd_signal = []
            signal = 0

            for i in range(len(data)):
                if data['MACD'].iloc[i] > data['Signal'].iloc[i] and data['High'].iloc[i] > data['B1'].iloc[i]:
                    if signal != 1:
                        buy_price.append(data['Close'].iloc[i])
                        sell_price.append(np.nan)
                        signal = 1
                        dbb_macd_signal.append(signal)
                    else:
                        buy_price.append(np.nan)
                        sell_price.append(np.nan)
                        dbb_macd_signal.append(0)
                elif data['MACD'].iloc[i] < data['Signal'].iloc[i] and data['Low'].iloc[i] < data['B2'].iloc[i]:
                    if signal != -1:
                        buy_price.append(np.nan)
                        sell_price.append(data['Close'][i])
                        signal = -1
                        dbb_macd_signal.append(signal)
                    else:
                        buy_price.append(np.nan)
                        sell_price.append(np.nan)
                        dbb_macd_signal.append(0)
                else:
                    buy_price.append(np.nan)
                    sell_price.append(np.nan)
                    dbb_macd_signal.append(0)


            position = []
            for i in range(len(dbb_macd_signal)):
                if dbb_macd_signal[i] > 1:
                    position.append(0)
                else:
                    position.append(1)

            for i in range(len(data['Close'])):
                if dbb_macd_signal[i] == 1:
                    position[i] = 1
                elif dbb_macd_signal[i] == -1:
                    position[i] = 0
                else:
                    position[i] =  position[i-1]

            close = data['Close']
            dbb_macd_signal = pd.DataFrame(dbb_macd_signal).rename(columns={0:'dbb_macd_signal'}).set_index(data.index)
            position = pd.DataFrame(position).rename(columns={0:'dbb_macd_position'}).set_index(data.index)

            frames = [close, dbb_macd_signal, position]
            strategy = pd.concat(frames, join='inner', axis=1)

            strategy['returns'] = strategy['Close'].pct_change().dropna()
            strategy['strategy_returns'] = strategy['returns'] * strategy['dbb_macd_position']
            strategy['shares'] = np.nan
            strategy['strategy_value'] = np.nan
            capital = 1000
            shares = 0
            for i in range(1, len(strategy)):
                if strategy['dbb_macd_signal'][i] == 1:
                    shares = capital / strategy['Close'][i]
                    strategy['shares'][i] = shares
                    capital = 0
                elif strategy['dbb_macd_signal'][i] == -1:
                    if shares > 0:
                        capital = shares * strategy['Close'][i]
                        shares = 0
                strategy['strategy_value'][i] = capital + shares * strategy['Close'][i]
            strategy['strategy_value'].fillna(method='ffill', inplace=True)

            return strategy['strategy_value']

In [44]:
portfolio_optimization = PortfolioOptimization(mag7, 10000)
allocations = portfolio_optimization.optimisation()

initial_capital = 10000
overall_time_series_list = []

for index, row in allocations.iterrows():
    last_month = index.strftime('%Y-%m')
    first_month = (index - pd.DateOffset(months=3)).strftime('%Y-%m')
    monthly_capital_value = 0
    end_of_month_value = 0
    time_series_returns_list = []
    for ticker in row.index:
        weight = row[ticker]
        allocated_capital = weight * initial_capital
        trading_strategy = TradingStrategy(mag7[ticker].loc[first_month:last_month], allocated_capital)
        strategy_value = trading_strategy.backtest()
        time_series_returns_list.append(strategy_value)
        post_trade_value = strategy_value[-1]
        monthly_capital_value += post_trade_value
    end_of_month_time_series = pd.concat(time_series_returns_list, axis=1).sum(axis=1)
    initial_capital = monthly_capital_value
    end_of_month_value = initial_capital
overall_time_series_list.append(end_of_month_time_series)
overall_time_series = pd.concat(overall_time_series_list, axis=1)

Missing data in 2013-03-31 00:00:00 to 2013-06-30 00:00:00 iteration, dropping columns with NaNs.
Missing data in 2013-06-30 00:00:00 to 2013-09-30 00:00:00 iteration, dropping columns with NaNs.
Missing data in 2013-09-30 00:00:00 to 2013-12-31 00:00:00 iteration, dropping columns with NaNs.
Optimisation failed for 2013-12-31 00:00:00: at least one of the assets must have an expected return exceeding the risk-free rate. Proceeding with equal weights.
Missing data in 2013-12-31 00:00:00 to 2014-03-31 00:00:00 iteration, dropping columns with NaNs.
Optimisation failed for 2014-06-30 00:00:00: at least one of the assets must have an expected return exceeding the risk-free rate. Proceeding with equal weights.
Optimisation failed for 2014-09-30 00:00:00: at least one of the assets must have an expected return exceeding the risk-free rate. Proceeding with equal weights.
Optimisation failed for 2014-12-31 00:00:00: at least one of the assets must have an expected return exceeding the risk-fr

In [47]:
overall_time_series

Unnamed: 0_level_0,0
Date,Unnamed: 1_level_1
2023-09-01,0.000000
2023-09-05,7000.000000
2023-09-06,7000.000000
2023-09-07,7000.000000
2023-09-08,7000.000000
...,...
2023-12-22,7027.276241
2023-12-26,7052.812902
2023-12-27,7073.305329
2023-12-28,7045.467628


In [None]:
class PerformanceMetrics:
    def __init__(self, capital_series: pd.DataFrame, risk_free_rate: float = 0.01):
        self.capital_series = capital_series
        self.risk_free_rate = risk_free_rate

    def compute_metrics(self):
        returns = self.capital_series.pct_change().dropna()
        
        total_return = self.capital_series.iloc[-1] / self.capital_series.iloc[0] - 1
        annual_return = (1 + total_return) ** (252 / len(self.capital_series)) - 1
        annual_volatility = returns.std() * np.sqrt(252)
        sharpe_ratio = (annual_return - self.risk_free_rate) / annual_volatility

        downside_returns = returns[returns < 0]
        downside_volatility = downside_returns.std() * np.sqrt(252)
        sortino_ratio = (annual_return - self.risk_free_rate) / downside_volatility

        rolling_max = self.capital_series.cummax()
        drawdown = self.capital_series / rolling_max - 1
        max_drawdown = drawdown.min()

        metrics = {
            "Total Return": total_return,
            "Annual Return": annual_return,
            "Annual Volatility": annual_volatility,
            "Sharpe Ratio": sharpe_ratio,
            "Sortino Ratio": sortino_ratio,
            "Maximum Drawdown": max_drawdown
        }

        return metrics