In [17]:
# My Project: Optimized Risk Parity Portfolio Backtester
# Overview:
# This Jupyter Colab notebook implements an advanced Risk Parity Portfolio
# Optimization model with reinforcement learning and sophisticated backtesting.
# The project allocates capital to assets such that each contributes equally to
# the portfolio's total risk, using historical asset data and a covariance matrix.
# Key features:
# - User inputs for asset tickers, start date, and end date to fetch historical
#   data from yfinance
# - Robust risk parity optimization using CVXPY, accounting for covariance
#   matrix uncertainty
# - Reinforcement Learning (Q-learning) to dynamically adjust portfolio weights
#   based on market conditions, maximizing risk-adjusted returns
# - Sophisticated backtesting with transaction costs, slippage, rolling
#   covariance estimation, and portfolio turnover tracking
# - Comprehensive performance metrics: Sharpe ratio, Sortino ratio, annualized
#   return, maximum drawdown, Calmar ratio, volatility, Omega ratio, and
#   Conditional Value-at-Risk (CVaR).
# - Visualizations for portfolio weights, performance, turnover, and CVaR over time

In [18]:
# CELL 2 START
# Import required libraries
import numpy as np
import pandas as pd
import yfinance as yf
import cvxpy as cp
import matplotlib.pyplot as plt
from datetime import datetime, date
import os

# Set random seed for reproducibility
np.random.seed(42)

In [None]:
# CELL 3 START
# Step 1: User Input and Data Collection
def validate_date(date_str):
    """
    Validate date format (YYYY-MM-DD) and ensure it's a valid date.
    """
    try:
        datetime.strptime(date_str, '%Y-%m-%d')
        return True
    except ValueError:
        return False

def load_cached_data(cache_file, tickers, start_date, end_date):
    """
    Load cached data if available and valid.
    """
    if os.path.exists(cache_file):
        cached_data = pd.read_csv(cache_file, index_col=0, parse_dates=True)
        cached_tickers = cached_data.columns.tolist()
        cached_start = cached_data.index.min().strftime('%Y-%m-%d')
        cached_end = cached_data.index.max().strftime('%Y-%m-%d')
        if (set(tickers) == set(cached_tickers) and
            cached_start <= start_date and
            cached_end >= end_date):
            print("Using cached data.")
            data = cached_data[tickers]
            data = data.ffill().dropna()
            returns = data.pct_change().dropna()
            return data, returns
    return None, None

def fetch_data(tickers, start_date, end_date):
    """
    Fetch historical adjusted close prices for given tickers from Yahoo Finance.
    """
    data = yf.download(tickers, start=start_date, end=end_date, auto_adjust=True, progress=False)['Close']
    if data.empty or data.shape[0] == 0:
        raise ValueError("No data retrieved from Yahoo Finance.")
    data = data.ffill().dropna()
    returns = data.pct_change().dropna()
    return data, returns

# Cache file path
cache_file = "market_data_cache.csv"

# Get user input
tickers = [t.strip() for t in input("Enter tickers (comma-separated, e.g., SPY,TLT,GLD): ").split(",") if t.strip()]
start_date = input("Enter start date (YYYY-MM-DD, e.g., 2021-01-01): ")
end_date = input("Enter end date (YYYY-MM-DD, e.g., 2025-01-01): ")

# Validate inputs
if not tickers:
    print("No valid tickers provided. Please enter at least one ticker.")
    raise ValueError("No valid tickers provided.")

if not validate_date(start_date):
    print("Invalid start date format. Use YYYY-MM-DD.")
    raise ValueError("Invalid start date format.")

if not validate_date(end_date) or datetime.strptime(end_date, '%Y-%m-%d') <= datetime.strptime(start_date, '%Y-%m-%d'):
    print("Invalid end date. Use YYYY-MM-DD and ensure end date is after start date.")
    raise ValueError("Invalid end date.")

# Try loading cached data
prices, returns = load_cached_data(cache_file, tickers, start_date, end_date)

# Fetch data if no valid cache
if prices is None or returns is None:
    try:
        prices, returns = fetch_data(tickers, start_date, end_date)
        prices.to_csv(cache_file)
        print("Data fetched successfully and cached!")
    except Exception as e:
        print(f"Error fetching data: {e}")
        print("Possible rate limit. Wait 5-10 minutes, try fewer tickers, or check your network.")
        raise SystemExit("Exiting due to data fetching failure.")

# Validate data
if prices.empty or returns.empty:
    print("No valid data retrieved. Exiting.")
    raise SystemExit("Exiting due to empty data.")

print(f"Prices shape: {prices.shape}")
print(f"Returns shape: {returns.shape}")
# CELL 3 END

In [None]:
# CELL 4 START
# Step 2: Robust Risk Parity Optimization
def risk_parity_weights(returns, max_iter=1000, robust=False, uncertainty_scale=0.1):
    """
    Compute risk parity weights using CVXPY, with optional robust optimization.
    """
    n_assets = returns.shape[1]
    cov_matrix = returns.cov().values
    w = cp.Variable(n_assets)
    risk = cp.sqrt(w.T @ cov_matrix @ w)

    risk_contributions = []
    for i in range(n_assets):
        rc = w[i] * (cov_matrix @ w)[i] / risk
        risk_contributions.append(rc)

    if robust:
        cov_uncertainty = uncertainty_scale * np.std(cov_matrix) * np.eye(n_assets)
        robust_risk = cp.sqrt(w.T @ (cov_matrix + cov_uncertainty) @ w)
        objective = cp.Minimize(cp.sum_squares(cp.vstack(risk_contributions)) + robust_risk)
    else:
        objective = cp.Minimize(cp.sum_squares(cp.vstack(risk_contributions)))

    constraints = [cp.sum(w) == 1, w >= 0]
    problem = cp.Problem(objective, constraints)
    problem.solve(max_iter=max_iter)

    if problem.status != cp.OPTIMAL:
        raise ValueError("Optimization did not converge")

    return w.value

# Compute initial weights
weights = risk_parity_weights(returns, robust=True, uncertainty_scale=0.1)
weights_df = pd.Series(weights, index=tickers, name='Robust Risk Parity Weights')
# CELL 4 END

In [23]:
# CELL 5 START
# Step 3: Reinforcement Learning for Dynamic Weight Adjustment
class QLearningAgent:
    def __init__(self, n_assets, n_states=10, learning_rate=0.1, discount_factor=0.95, epsilon=0.1):
        """
        Q-learning agent to adjust portfolio weights based on market states.
        """
        self.n_assets = n_assets
        self.n_states = n_states
        self.q_table = np.zeros((n_states, n_assets))
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.epsilon = epsilon

    def get_state(self, returns, lookback=20):
        """
        Discretize market state based on recent returns and volatility.
        """
        recent_returns = returns[-lookback:].mean()
        volatility = returns[-lookback:].std()
        return min(int((recent_returns / volatility) * self.n_states / 2 + self.n_states / 2), self.n_states - 1)

    def choose_action(self, state):
        """
        Epsilon-greedy action selection.
        """
        if np.random.rand() < self.epsilon:
            return np.random.randint(self.n_assets)
        return np.argmax(self.q_table[state])

    def update(self, state, action, reward, next_state):
        """
        Update Q-table based on reward and next state.
        """
        best_next_action = np.argmax(self.q_table[next_state])
        self.q_table[state, action] += self.learning_rate * (
            reward + self.discount_factor * self.q_table[next_state, best_next_action] - self.q_table[state, action]
        )

    def adjust_weights(self, weights, action, step_size=0.1):
        """
        Adjust portfolio weights based on selected action.
        """
        new_weights = weights.copy()
        new_weights[action] += step_size
        if new_weights[action] > 1:
            new_weights[action] = 1
        elif new_weights[action] < 0:
            new_weights[action] = 0
        new_weights = new_weights / np.sum(new_weights)
        return new_weights

def train_rl_agent(returns, weights, lookback=20, n_episodes=50):
    """
    Train Q-learning agent to optimize portfolio weights.
    """
    agent = QLearningAgent(n_assets=len(tickers))
    for episode in range(n_episodes):
        for t in range(lookback, len(returns)):
            state = agent.get_state(returns.iloc[:t])
            action = agent.choose_action(state)
            new_weights = agent.adjust_weights(weights, action)

            next_returns = returns.iloc[t:t+lookback]
            portfolio_returns = np.sum(next_returns * new_weights, axis=1)
            reward = np.mean(portfolio_returns) / np.std(portfolio_returns) * np.sqrt(252) if np.std(portfolio_returns) > 0 else 0

            next_state = agent.get_state(returns.iloc[:t+1])
            agent.update(state, action, reward, next_state)

    return agent

# Train RL agent
rl_agent = train_rl_agent(returns, weights)
# CELL 5 END

In [23]:
# CELL 6 START
# Step 4: Sophisticated Backtesting Framework
def backtest_portfolio(prices, returns, weights, rl_agent=None, rebalance_freq='M', lookback=20, cov_window=60, trans_cost=0.001, slippage=0.0005):
    """
    Backtest portfolio with transaction costs, slippage, rolling covariance, and RL adjustments.
    """
    portfolio_value = [10000]
    holdings = (portfolio_value[0] * weights / prices[tickers].iloc[0]).values
    weights_history = [weights]
    turnover = []
    n_assets = len(tickers)

    for i in range(1, len(prices)):
        # Calculate portfolio value
        value = np.sum(holdings * prices[tickers].iloc[i])
        portfolio_value.append(value)

        # Rebalance monthly
        if rebalance_freq == 'M' and prices.index[i].is_month_end and i >= cov_window:
            # Rolling covariance for risk parity
            cov_returns = returns.iloc[max(0, i-cov_window):i]
            new_weights = risk_parity_weights(cov_returns, robust=True, uncertainty_scale=0.1)

            # RL adjustment
            if rl_agent:
                state = rl_agent.get_state(returns.iloc[:i], lookback)
                action = rl_agent.choose_action(state)
                new_weights = rl_agent.adjust_weights(new_weights, action)

            # Calculate turnover and costs
            old_weights = holdings * prices[tickers].iloc[i] / value
            turnover.append(np.sum(np.abs(new_weights - old_weights)))
            cost = trans_cost * turnover[-1] * value + slippage * turnover[-1] * value
            value -= cost

            # Update holdings
            holdings = (value * new_weights / prices[tickers].iloc[i]).values
            weights_history.append(new_weights)

    portfolio_series = pd.Series(portfolio_value, index=prices.index)
    portfolio_returns = portfolio_series.pct_change().dropna()

    # Performance metrics
    annualized_return = np.mean(portfolio_returns) * 252
    volatility = np.std(portfolio_returns) * np.sqrt(252)
    sharpe_ratio = annualized_return / volatility if volatility > 0 else 0
    downside_returns = portfolio_returns[portfolio_returns < 0]
    sortino_ratio = annualized_return / (np.std(downside_returns) * np.sqrt(252)) if len(downside_returns) > 0 else 0
    max_drawdown = np.min(portfolio_series / portfolio_series.cummax() - 1)
    calmar_ratio = annualized_return / abs(max_drawdown) if max_drawdown != 0 else 0

    # Omega ratio (threshold = 0)
    threshold = 0
    gains = portfolio_returns[portfolio_returns > threshold].sum()
    losses = -portfolio_returns[portfolio_returns <= threshold].sum()
    omega_ratio = gains / losses if losses > 0 else np.inf

    # CVaR (5% worst returns)
    cvar = np.mean(portfolio_returns[portfolio_returns <= np.percentile(portfolio_returns, 5)]) if len(portfolio_returns) > 0 else 0

    return portfolio_series, weights_history, turnover, {
        'Sharpe Ratio': sharpe_ratio,
        'Sortino Ratio': sortino_ratio,
        'Annualized Return': annualized_return,
        'Max Drawdown': max_drawdown,
        'Calmar Ratio': calmar_ratio,
        'Volatility': volatility,
        'Omega Ratio': omega_ratio,
        'CVaR': cvar
    }

# Run backtest
portfolio_value, weights_history, turnover, metrics = backtest_portfolio(prices, returns, weights, rl_agent=rl_agent)
# CELL 6 END

In [23]:
# CELL 7 START
# Step 5: Visualization and Results
plt.figure(figsize=(15, 10))

# Portfolio weights
plt.subplot(2, 2, 1)
weights_df = pd.DataFrame(weights_history, index=prices.index[:len(weights_history)], columns=tickers)
weights_df.plot(ax=plt.gca(), title='Portfolio Weights Over Time')
plt.xlabel('Date')
plt.ylabel('Weight')

# Portfolio value
plt.subplot(2, 2, 2)
portfolio_value.plot(title='Portfolio Value Over Time (RL-Adjusted)')
plt.xlabel('Date')
plt.ylabel('Portfolio Value ($)')

# Turnover
plt.subplot(2, 2, 3)
pd.Series(turnover, index=prices.index[1:len(turnover)+1]).plot(title='Portfolio Turnover')
plt.xlabel('Date')
plt.ylabel('Turnover')

# CVaR
plt.subplot(2, 2, 4)
rolling_cvar = portfolio_value.pct_change().dropna().rolling(20).apply(lambda x: np.mean(x[x <= np.percentile(x, 5)]))
rolling_cvar.plot(title='Rolling CVaR (20-day)')
plt.xlabel('Date')
plt.ylabel('CVaR')
plt.tight_layout()
plt.show()

# Print performance metrics
for metric, value in metrics.items():
    if metric in ['Max Drawdown', 'Annualized Return', 'Volatility', 'CVaR']:
        print(f"{metric}: {value:.2%}")
    else:
        print(f"{metric}: {value:.2f}")
# CELL 7 END