# Q4 Comparison Between Traditional Method and Machine Learning Methods

This notebook experiments with various strategies (Markowitz and several Machine Learning algorithms)  for constructing portfolios to determine the most effective method for optimization.

In [1]:
import pandas as pd
import numpy as np
from pathlib import Path
import os
import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
from scipy.optimize import minimize
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import accuracy_score, classification_report
import sys
cwd = Path(os.getcwd())
if cwd.name == 'scripts':
    # If running from 'scripts' directory, go up one level
    project_root = cwd.parent
else:
    # Otherwise, assume current directory is the project root
    project_root = cwd

if str(project_root) not in sys.path:
    sys.path.append(str(project_root))

from models.neural import SequenceTransformer
from models.neural import SequenceTransformer

In [2]:
DATA_PATH = project_root / 'assets' / 'datasets' / 'stock_market' / 'Stocks'
TICKERS = [
    'nvda.us.txt', 'msft.us.txt', 'aapl.us.txt', 'googl.us.txt', 'amzn.us.txt', 'fb.us.txt', 'avgo.us.txt', 'tsla.us.txt', 'tsm.us.txt', 'brk-b.us.txt', 
    'orcl.us.txt', 'jpm.us.txt', 'wmt.us.txt', 'lly.us.txt', 'v.us.txt', 'nflx.us.txt', 'ma.us.txt', 'xom.us.txt', 'jnj.us.txt', 'cost.us.txt', 
    'abbv.us.txt', 'asml.us.txt', 'hd.us.txt', 'baba.us.txt', 'bac.us.txt', 'amd.us.txt', 'pg.us.txt', 'unh.us.txt', 'ge.us.txt', 'sap.us.txt', 
    'cvx.us.txt', 'ko.us.txt', 'csco.us.txt', 'azn.us.txt', 'ibm.us.txt', 'nvo.us.txt', 'tmus.us.txt', 'wfc.us.txt', 'nvs.us.txt', 'tm.us.txt', 
    'gs.us.txt', 'pm.us.txt', 'ms.us.txt', 'crm.us.txt', 'cat.us.txt', 'abt.us.txt', 'hsbc.us.txt', 'axp.us.txt', 'mu.us.txt', 'mcd.us.txt', 
    'dis.us.txt', 'tmo.us.txt', 'bx.us.txt', 'now.us.txt', 'anet.us.txt', 't.us.txt', 'intu.us.txt', 'blk.us.txt', 'intc.us.txt', 'c.us.txt', 
    'amat.us.txt', 'lrcx.us.txt', 'qcom.us.txt', 'nee.us.txt', 'schw.us.txt', 'hdb.us.txt', 'vz.us.txt', 'ba.us.txt', 'txn.us.txt', 'amgn.us.txt', 
    'tjx.us.txt', 'isrg.us.txt', 'aph.us.txt', 'acn.us.txt', 'ul.us.txt', 'san.us.txt', 'dhr.us.txt', 'gild.us.txt', 'spgi.us.txt', 'etn.us.txt', 
    'adbe.us.txt'
]
LIMIT_PER_TICKER = 500


In [3]:
# Load data
all_data = []
for ticker_file in TICKERS:
    try:
        df = pd.read_csv(DATA_PATH / ticker_file)
        df['Ticker'] = ticker_file.split('.')[0].replace('-', '.') # Handle tickers like brk-b
        if LIMIT_PER_TICKER:
            df = df.tail(LIMIT_PER_TICKER)
        all_data.append(df)
    except FileNotFoundError:
        print(f"Warning: {ticker_file} not found. Skipping.")

if not all_data:
    raise FileNotFoundError("No stock data found. Please check the data path and tickers.")

data = pd.concat(all_data, ignore_index=True)
data['Date'] = pd.to_datetime(data['Date'])
data = data.sort_values(by=['Date', 'Ticker']).reset_index(drop=True)

print("Data loaded successfully.")

Data loaded successfully.


In [4]:

data['Return'] = data.groupby('Ticker')['Close'].pct_change()
data['Target'] = data.groupby('Ticker')['Return'].shift(-1)
data = data.dropna()
data['Target_Direction'] = (data['Target'] > 0).astype(int)

features = ['Return']
train_data, test_data = train_test_split(data, test_size=0.2, shuffle=False)

print("Data preparation complete.")


Data preparation complete.


In [10]:
def get_portfolio_performance(daily_returns, risk_free_rate=0.04):
    annual_return = daily_returns.mean() * 252

    annual_volatility = daily_returns.std() * np.sqrt(252)
    excess_return = annual_return - risk_free_rate

    if annual_volatility != 0:
        sharpe_ratio = excess_return / annual_volatility
    else:
        sharpe_ratio = 0

    return annual_return, annual_volatility, sharpe_ratio

def get_neg_sharpe_ratio(weights: np.ndarray, mean_returns: pd.Series, cov_matrix: pd.DataFrame) -> float:
    """Calculates the negative Sharpe ratio for the optimizer."""
    portfolio_return = np.sum(mean_returns * weights) * 252
    portfolio_std = np.sqrt(np.dot(weights.T, np.dot(cov_matrix, weights))) * np.sqrt(252)
    sharpe_ratio = portfolio_return / portfolio_std if portfolio_std != 0 else 0
    return -sharpe_ratio

def find_optimal_weights(mean_returns: pd.Series, cov_matrix: pd.DataFrame) -> np.ndarray:
    """Finds the optimal portfolio weights that maximize the Sharpe ratio."""
    num_assets = len(mean_returns)
    args = (mean_returns, cov_matrix)
    constraints = ({'type': 'eq', 'fun': lambda x: np.sum(x) - 1})
    bounds = tuple((0, 1) for _ in range(num_assets))
    initial_weights = num_assets * [1. / num_assets]

    result = minimize(get_neg_sharpe_ratio, initial_weights, args=args,
                        method='SLSQP', bounds=bounds, constraints=constraints)

    return result.x

# Prepare data for traditional optimization
train_returns_df = train_data.pivot(index='Date', columns='Ticker', values='Return')
mean_returns = train_returns_df.mean()
cov_matrix = train_returns_df.cov()

# Calculate optimal weights for the traditional portfolio
traditional_weights = find_optimal_weights(mean_returns, cov_matrix)
traditional_weights = pd.Series(traditional_weights, index=mean_returns.index)

print("Traditional Portfolio Weights calculated.")

Traditional Portfolio Weights calculated.


In [11]:
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(train_data[features])
y_train = train_data['Target_Direction']
X_test_scaled = scaler.transform(test_data[features])
y_test = test_data['Target_Direction']

# --- Classical ML Models ---
print("\n--- Training Classical ML Models ---")
ml_models = {
    'Logistic Regression': LogisticRegression(max_iter=1000),
    'Random Forest': RandomForestClassifier(random_state=42),
    'Gradient Boosting': GradientBoostingClassifier(random_state=42)
}
ml_predictions = {}
for name, model in ml_models.items():
    print(f"Training {name}...")
    model.fit(X_train_scaled, y_train)
    predictions = model.predict(X_test_scaled)
    ml_predictions[name] = predictions
    print(f"--- {name} Classification Report ---")
    print(classification_report(y_test, predictions, zero_division=0))



--- Training Classical ML Models ---
Training Logistic Regression...
--- Logistic Regression Classification Report ---
              precision    recall  f1-score   support

           0       0.42      0.02      0.03      3707
           1       0.54      0.98      0.70      4361

    accuracy                           0.54      8068
   macro avg       0.48      0.50      0.36      8068
weighted avg       0.49      0.54      0.39      8068

Training Random Forest...
--- Random Forest Classification Report ---
              precision    recall  f1-score   support

           0       0.46      0.47      0.46      3707
           1       0.54      0.53      0.53      4361

    accuracy                           0.50      8068
   macro avg       0.50      0.50      0.50      8068
weighted avg       0.50      0.50      0.50      8068

Training Gradient Boosting...
--- Gradient Boosting Classification Report ---
              precision    recall  f1-score   support

           0       0.46

In [12]:
# --- Deep Learning Model ---
print("\n--- Training Transformer Model ---")

def create_sequences(X, y, time_steps=10):
    Xs, ys = [], []
    for i in range(len(X) - time_steps):
        Xs.append(X[i:(i + time_steps)])
        ys.append(y[i + time_steps])
    return np.array(Xs), np.array(ys)

TIME_STEPS = 10
X_train_seq, y_train_seq = create_sequences(X_train_scaled, y_train.values, TIME_STEPS)
X_test_seq, y_test_seq = create_sequences(X_test_scaled, y_test.values, TIME_STEPS)

X_train_tensor = torch.tensor(X_train_seq, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train_seq, dtype=torch.long)
X_test_tensor = torch.tensor(X_test_seq, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test_seq, dtype=torch.long)

train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
val_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)


def train_dl_model(model, train_loader, val_loader, epochs=50, learning_rate=0.001, patience=5):
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    best_loss = np.inf
    patience_counter = 0

    for epoch in range(epochs):
        model.train()
        for features, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(features)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
        
        model.eval()
        val_loss = 0
        with torch.no_grad():
            for features, labels in val_loader:
                outputs = model(features)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
        
        val_loss /= len(val_loader)

        if val_loss < best_loss:
            best_loss = val_loss
            patience_counter = 0
            torch.save(model.state_dict(), 'best_model.pth')
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print(f"Early stopping at epoch {epoch+1}")
                break
    
    model.load_state_dict(torch.load('best_model.pth'))

def predict_dl(model, data_loader):
    model.eval()
    predictions = []
    with torch.no_grad():
        for features, _ in data_loader:
            outputs = model(features)
            _, predicted = torch.max(outputs.data, 1)
            predictions.extend(predicted.tolist())
    return np.array(predictions)

input_dim = len(features)
transformer_model = SequenceTransformer(input_dim=input_dim, d_model=64, nhead=4, num_layers=2, dim_feedforward=128, dropout=0.3)

train_dl_model(transformer_model, train_loader, val_loader)
transformer_predictions_full = predict_dl(transformer_model, DataLoader(TensorDataset(X_test_tensor, y_test_tensor), batch_size=128, shuffle=False))
dl_predictions = {'Transformer': transformer_predictions_full}

print(f"--- Transformer Classification Report ---")
print(classification_report(y_test_seq, transformer_predictions_full, zero_division=0))


--- Training Transformer Model ---
Early stopping at epoch 16
--- Transformer Classification Report ---
              precision    recall  f1-score   support

           0       0.00      0.00      0.00      3700
           1       0.54      1.00      0.70      4358

    accuracy                           0.54      8058
   macro avg       0.27      0.50      0.35      8058
weighted avg       0.29      0.54      0.38      8058



In [8]:
def backtest_portfolio(test_df, predictions):
    # Align predictions with test_df
    if len(predictions) < len(test_df):
        offset = len(test_df) - len(predictions)
        test_df = test_df.iloc[offset:]
    
    test_df['prediction'] = predictions

    daily_returns = []
    for date, daily_group in test_df.groupby('Date'):
        selected_tickers = daily_group[daily_group['prediction'] == 1]['Ticker'].unique()
        if len(selected_tickers) > 0:
            daily_return = daily_group[daily_group['Ticker'].isin(selected_tickers)]['Target'].mean()
            daily_returns.append(daily_return)
        else:
            daily_returns.append(0)
    return pd.Series(daily_returns)

In [13]:

all_results = {}

# --- Traditional Markowitz ---
test_returns_pivot = test_data.pivot(index='Date', columns='Ticker', values='Return')
# Align weights with test returns columns
aligned_weights = traditional_weights.reindex(test_returns_pivot.columns).fillna(0)
daily_trad_returns = (test_returns_pivot * aligned_weights).sum(axis=1)
all_results['Traditional Markowitz'] = get_portfolio_performance(daily_trad_returns)

# --- ML Models ---
for name, predictions in ml_predictions.items():
    daily_ml_returns = backtest_portfolio(test_data, predictions)
    all_results[name] = get_portfolio_performance(daily_ml_returns)

# --- DL Model ---
# Align DL predictions. They are shorter due to sequence creation.
dl_test_data = test_data.iloc[len(test_data) - len(transformer_predictions_full):]
daily_dl_returns = backtest_portfolio(dl_test_data, transformer_predictions_full)
all_results['Transformer'] = get_portfolio_performance(daily_dl_returns)

# --- Display Results ---
final_results_df = pd.DataFrame.from_dict(all_results, orient='index', columns=['Annual Return', 'Volatility', 'Sharpe Ratio'])
print("\n--- Final Portfolio Performance Comparison ---")
print(final_results_df)


--- Final Portfolio Performance Comparison ---
                       Annual Return  Volatility  Sharpe Ratio
Traditional Markowitz       0.330220    0.104143      2.786735
Logistic Regression         0.268434    0.083258      2.743690
Random Forest               0.260887    0.086415      2.556120
Gradient Boosting           0.254145    0.084847      2.523905
Transformer                 0.286766    0.083216      2.965377


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  test_df['prediction'] = predictions
