<a href="https://colab.research.google.com/github/saint-broseph/ai-portfolio-optimization-lstm/blob/main/aivsmvo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install yfinance PyPortfolioOpt matplotlib seaborn

In [None]:
import yfinance as yf
import pandas as pd

tickers = ["BTC-USD", "ETH-USD", "SPY", "GLD", "NVDA"]
df = yf.download(tickers, start="2019-01-01", end="2024-12-31", auto_adjust=True)

if isinstance(df.columns, pd.MultiIndex):
    data = df['Close']
else:
    data = df
data = data.dropna()
returns = data.pct_change().dropna()

print(f"Successfully downloaded data for: {list(data.columns)}")
print(f"Data shape: {data.shape}")
data.head()

In [None]:
from pypfopt.efficient_frontier import EfficientFrontier
from pypfopt import risk_models
from pypfopt import expected_returns
import matplotlib.pyplot as plt
# 1. Calculate the expected annual returns and the annual sample covariance matrix
# This tells the math model how much each asset earns and how risky/volatile it is
mu = expected_returns.mean_historical_return(data)
S = risk_models.sample_cov(data)
# 2. Optimize for the "Max Sharpe Ratio" (the best return-to-risk balance)
ef = EfficientFrontier(mu, S)
weights = ef.max_sharpe()
cleaned_weights = ef.clean_weights()
print("--- Traditional MVO Weights ---")
for asset, weight in cleaned_weights.items():
    print(f"{asset}: {weight*100:.2f}%")
# 3. Display Performance
print("\n--- Expected Performance ---")
performance = ef.portfolio_performance(verbose=True)

In [None]:
import numpy as np
def create_features(df):
    # Calculate daily returns
    returns = df.pct_change()
    # Calculate 14-day RSI (Relative Strength Index) - a momentum indicator
    delta = returns.diff()
    up = delta.clip(lower=0)
    down = -1 * delta.clip(upper=0)
    ema_up = up.ewm(com=13, adjust=False).mean()
    ema_down = down.ewm(com=13, adjust=False).mean()
    rs = ema_up / ema_down
    rsi = 100 - (100 / (1 + rs))
    # Calculate 20-day Volatility
    vol = returns.rolling(window=20).std()
    return returns, rsi, vol
returns, rsi, vol = create_features(data)
# Combine them into a single dataset and drop the "NaN" rows from the beginning
# We will use these to feed the LSTM in the next step
features = pd.concat([returns, rsi, vol], axis=1).dropna()
print(f"Features created. New shape: {features.shape}")

In [None]:
import pandas as pd
import numpy as np

def calculate_max_drawdown(portfolio_returns):
    # Fix: Convert numpy array to pandas Series if needed
    if isinstance(portfolio_returns, np.ndarray):
        portfolio_returns = pd.Series(portfolio_returns)

    # Calculate cumulative wealth (starting at 1.0)
    cumulative_returns = (1 + portfolio_returns).cumprod()

    # Calculate the peak wealth at each point in time using cummax()
    # (cummax is faster and more reliable than expanding().max())
    peak = cumulative_returns.cummax()

    # Calculate the percentage drop from the peak
    drawdown = (cumulative_returns / peak) - 1
    return drawdown.min()

# Calculate MDD for an equal-weighted portfolio as a quick test
equal_weights = np.array([1/len(tickers)] * len(tickers))
test_port_returns = (returns.dropna() * equal_weights).sum(axis=1)
print(f"Max Drawdown of a simple Equal-Weighted Portfolio: {calculate_max_drawdown(test_port_returns):.2%}")

In [None]:
import torch
import torch.nn as nn
import numpy as np
from sklearn.preprocessing import MinMaxScaler

# 1. Scale all features (Returns, RSI, Volatility)
scaler = MinMaxScaler()
scaled_features = scaler.fit_transform(features)

# 2. Create Sequences
def create_sequences(data, seq_length):
    xs, ys = [], []
    for i in range(len(data) - seq_length):
        x = data[i : (i + seq_length)]
        # Target: The actual returns of the assets on the very next day
        y = data[i + seq_length, :len(tickers)]
        xs.append(x)
        ys.append(y)
    return np.array(xs), np.array(ys)

seq_length = 30 # Looking back 30 trading days
X, y = create_sequences(scaled_features, seq_length)

# 3. Split: 80% to train the AI, 20% to test it on "unseen" future data
split = int(len(X) * 0.8)
X_train, X_test = X[:split], X[split:]
y_train, y_test = y[:split], y[split:]

# Convert to PyTorch Tensors (The format the AI brain speaks)
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)

print(f"Training shape: {X_train.shape}") # Should be [Samples, 30, Number_of_Features]

In [None]:
class LSTMRebalancer(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(LSTMRebalancer, self).__init__()
        # The LSTM layer processes the time-series "memory"
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        # The Linear layer turns that memory into a portfolio decision
        self.fc = nn.Linear(hidden_size, output_size)
        # Softmax ensures the weights are positive and sum to 1.0 (100%)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        _, (hn, _) = self.lstm(x)
        out = self.fc(hn[-1]) # Use the last hidden state
        return self.softmax(out)

# Initialize
# input_size = number of columns in your features (Returns + RSI + Vol)
model = LSTMRebalancer(input_size=X_train.shape[2], hidden_size=64, output_size=len(tickers))
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
def robust_drawdown_loss(predicted_weights, actual_returns):
    # Calculate portfolio return
    portfolio_return = torch.sum(predicted_weights * actual_returns, dim=1)

    mean_return = torch.mean(portfolio_return)

    # Check if we actually have negative returns to penalize
    negative_returns = portfolio_return[portfolio_return < 0]
    if len(negative_returns) > 0:
        risk_penalty = torch.mean(torch.square(negative_returns)) * 10
    else:
        risk_penalty = 0.0

    return -mean_return + risk_penalty

# --- RE-INITIALIZE MODEL TO CLEAR THE 'NAN' BRAIN ---
model = LSTMRebalancer(input_size=X_train.shape[2], hidden_size=64, output_size=len(tickers))
optimizer = torch.optim.Adam(model.parameters(), lr=0.0005) # Lower learning rate

# --- RE-RUN TRAINING ---
epochs = 100
for epoch in range(epochs):
    model.train()
    optimizer.zero_grad()

    outputs = model(X_train)
    loss = robust_drawdown_loss(outputs, y_train)

    # Check for NaN before moving forward
    if torch.isnan(loss):
        print(f"NaN detected at epoch {epoch}")
        break

    loss.backward()

    # Gradient Clipping: Prevents the math from exploding
    torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

    optimizer.step()

    if (epoch + 1) % 10 == 0:
        print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.6f}")

In [None]:
import matplotlib.pyplot as plt

# 1. Get AI Predictions for the test set
model.eval()
with torch.no_grad():
    ai_weights = model(X_test).numpy()
# 2. Get actual returns for the test period
# (The returns of the assets on the days the AI was picking weights for)
test_returns = returns.iloc[-len(ai_weights):].values

# 3. Calculate Daily Portfolio Returns
ai_portfolio_rets = np.sum(ai_weights * test_returns, axis=1)
equal_weights = np.array([1/len(tickers)] * len(tickers))
eq_portfolio_rets = np.sum(equal_weights * test_returns, axis=1)

# Note: We use the weights from Step 3 for the MVO baseline
mvo_w = np.array([cleaned_weights[t] for t in tickers])
mvo_portfolio_rets = np.sum(mvo_w * test_returns, axis=1)

# 4. Calculate Cumulative Growth
ai_cum = (1 + ai_portfolio_rets).cumprod()
eq_cum = (1 + eq_portfolio_rets).cumprod()
mvo_cum = (1 + mvo_portfolio_rets).cumprod()

# 5. PLOT THE RESULTS
plt.figure(figsize=(12, 6))
plt.plot(ai_cum, label=f'AI Strategy (Drawdown-Optimized)', lw=2, color='blue')
plt.plot(mvo_cum, label='Traditional Math (MVO)', lw=1.5, color='green', linestyle='--')
plt.plot(eq_cum, label='Equal Weighted (Benchmark)', lw=1, color='gray', alpha=0.7)

plt.title('Final Performance Comparison: AI vs Traditional Math', fontsize=14)
plt.xlabel('Days in Test Period')
plt.ylabel('Cumulative Growth of $1')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

# 6. Final Metric Check
print(f"AI Final Return: {(ai_cum[-1]-1)*100:.2f}%")
print(f"MVO Final Return: {(mvo_cum[-1]-1)*100:.2f}%")
print(f"AI Max Drawdown: {calculate_max_drawdown(ai_portfolio_rets)*100:.2f}%")
print(f"MVO Max Drawdown: {calculate_max_drawdown(mvo_portfolio_rets)*100:.2f}%")

In [None]:
last_ai_weights = ai_weights[-1]
mvo_weights_list = [cleaned_weights[t] for t in tickers]

weight_df = pd.DataFrame({
    'Asset': tickers,
    'AI Weights': last_ai_weights,
    'MVO Weights': mvo_weights_list
})
print(weight_df)

In [None]:
plt.savefig('performance_results.pdf', bbox_inches='tight', dpi=300)

In [None]:
import numpy as np

def professional_backtest(weights, asset_returns, fee_rate=0.001):
    """
    weights: (N_days, N_assets)
    asset_returns: (N_days, N_assets)
    fee_rate: 0.1% per trade
    """
    n_days = len(weights)
    portfolio_returns = []

    # Start with initial weights
    current_weights = weights[0]

    for t in range(n_days):
        # 1. Calculate returns for the day based on current weights
        daily_ret = np.sum(weights[t] * asset_returns[t])

        # 2. Calculate Transaction Costs
        # We pay fees only on the CHANGE in weights (buying/selling)
        if t > 0:
            weight_change = np.sum(np.abs(weights[t] - weights[t-1]))
            transaction_cost = weight_change * fee_rate
        else:
            transaction_cost = fee_rate # Initial setup cost

        portfolio_returns.append(daily_ret - transaction_cost)

    return np.array(portfolio_returns)

# --- Execute Backtest ---

# 1. Calculate Professional Returns
ai_pro_rets = professional_backtest(ai_weights, test_returns)
mvo_pro_rets = professional_backtest(np.tile(mvo_w, (len(ai_weights), 1)), test_returns)

# 2. Calculate Sharpe Ratio (Risk-Adjusted Return)
# Formula: (Mean Return / Standard Deviation) * sqrt(252 trading days)
def calculate_sharpe(rets):
    if len(rets) == 0: return 0
    return (np.mean(rets) / np.std(rets)) * np.sqrt(252)

# 3. Final Metrics
print(f"--- Professional Results (Post-Fees) ---")
print(f"AI Final Return: {(np.prod(1 + ai_pro_rets) - 1)*100:.2f}%")
print(f"MVO Final Return: {(np.prod(1 + mvo_pro_rets) - 1)*100:.2f}%")
print("-" * 30)
print(f"AI Sharpe Ratio: {calculate_sharpe(ai_pro_rets):.2f}")
print(f"MVO Sharpe Ratio: {calculate_sharpe(mvo_pro_rets):.2f}")
print("-" * 30)
print(f"AI Max Drawdown: {calculate_max_drawdown(ai_pro_rets)*100:.2f}%")
print(f"MVO Max Drawdown: {calculate_max_drawdown(mvo_pro_rets)*100:.2f}%")

In [None]:
def calculate_sortino(rets):
    # Only consider negative returns for the denominator
    downside_rets = rets[rets < 0]
    if len(downside_rets) == 0: return 0
    return (np.mean(rets) / np.std(downside_rets)) * np.sqrt(252)

print(f"AI Sortino Ratio: {calculate_sortino(ai_pro_rets):.2f}")
print(f"MVO Sortino Ratio: {calculate_sortino(mvo_pro_rets):.2f}")