### Import Libraries

In [None]:
import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt


### Define Configs

In [None]:
tickers = [
    'MSFT',
    'AAPL',
    'NVDA',
    'AMZN',
    'GOOG',
    'META',
    'TSLA'
]

initial_capital = 1_000_000
commission = 0.001  # 0.10%
slippage = 0.0002  # 0.02%
min_shares = 10
risk_free_rate = 0.02
trading_days = 252  # Annual trading days
start_date = "1981-01-01"
end_date = "2023-12-31"
max_weight = 0.30  # Maximum 30% of AUM in any single stock


### Fetching data

In [None]:
def fetch_data(tickers):
    data = {}
    for ticker in tickers:
        print(f"Fetching data for {ticker}...")

        df = yf.download(ticker, start=start_date, end=end_date, auto_adjust=True)

        if df.empty:
            print(f"Warning: No data for {ticker}.")
            continue

        df['Ticker'] = ticker
        data[ticker] = df
    return data

price_data = fetch_data(tickers)


### RSI Strategy Signal & Position Reference

| Column      | Value | Description                                                   |
|-------------|-------|---------------------------------------------------------------|
| `signal`    | `1`   | Entry signal to **go long** (RSI < entry threshold, e.g. 25)  |
|             | `-1`  | Entry signal to **go short** (RSI > short threshold, e.g. 75) |
|             | `-9`  | **Exit long** position (RSI > 50)                             |
|             | `9`   | **Exit short** position (RSI < 50)                            |
|             | `0`   | No signal on this date                                        |
| `positions` | `1`   | Holding a **long** position                                   |
|             | `-1`  | Holding a **short** position                                  |
|             | `0`   | **No position** / flat                                        |


In [None]:
def compute_rsi(series, window=14):
    delta = series.diff()
    gain = delta.clip(lower=0)
    loss = -delta.clip(upper=0)

    avg_gain = gain.rolling(window).mean()
    avg_loss = loss.rolling(window).mean()

    rs = avg_gain / avg_loss
    rsi = 100 - (100 / (1 + rs))
    return rsi

In [None]:
def generate_rsi_signals(
    price_df: pd.DataFrame,
    lookback: int = 14,
    long_rsi: int = 25,
    short_rsi: int = 75,
    exit_rsi: int = 50
):
    """
    RSI signal generator using full price DataFrame (expects 'Close').

    - Trade **signals** are based on RSI of CLOSE prices.
    - Signals are shifted for next-day execution (execution_signal).
    - Target direction is the ideal state (long/short/flat), used for valuation.

    Returns:
        execution_signal (pd.Series): shifted trade signal for execution (at next open)
        target_direction (pd.Series): desired directional exposure (1, -1, 0)
    """

    close = price_df['Close']
    rsi = compute_rsi(close, lookback)

    df = price_df.copy()
    df['rsi'] = rsi
    df['target_direction'] = 0
    df['signal'] = np.nan

    in_position = 0  # 1 = long, -1 = short, 0 = flat

    for i in range(1, len(df)):
        rsi_now = df['rsi'].iloc[i]
        idx_now = df.index[i]

        if in_position == 0:
            if rsi_now < long_rsi:
                df.loc[df.index[i + 1:], 'target_direction'] = 1
                df.loc[idx_now, 'signal'] = 1
                in_position = 1
            elif rsi_now > short_rsi:
                df.loc[df.index[i + 1:], 'target_direction'] = -1
                df.loc[idx_now, 'signal'] = -1
                in_position = -1

        elif in_position == 1 and rsi_now > exit_rsi:
            df.loc[df.index[i + 1:], 'target_direction'] = 0
            df.loc[idx_now, 'signal'] = -9
            in_position = 0

        elif in_position == -1 and rsi_now < exit_rsi:
            df.loc[df.index[i + 1:], 'target_direction'] = 0
            df.loc[idx_now, 'signal'] = 9
            in_position = 0

    # Shift signal by 1 to reflect execution on the next trading day
    execution_signal = df['signal'].shift(1)
    target_direction = df['target_direction'].fillna(0).astype(int)

    return execution_signal, target_direction

# Construct Portfolio

In [None]:
execution_signals = {}
target_directions = {}

for ticker in tickers:
    print(f"Generating signals for {ticker}")
    execution_signal, target_direction = generate_rsi_signals(price_data[ticker])
    execution_signals[ticker] = execution_signal
    target_directions[ticker] = target_direction
    
# Clean trading dates for Mag 7
all_dates_set = set()
for ticker, df in price_data.items():
    all_dates_set.update(df.index)

full_union_dates = pd.DatetimeIndex(sorted(all_dates_set))
all_dates = full_union_dates[(full_union_dates >= pd.to_datetime(start_date)) &
                                  (full_union_dates <= pd.to_datetime(end_date))]

In [None]:
execution_signals_df = pd.DataFrame(data=execution_signals, index=all_dates)
target_directions_df = pd.DataFrame(data=target_directions, index=all_dates).fillna(0).astype(int)


In [None]:
print(execution_signals_df.shape == target_directions_df.shape)

In [None]:
# helper functions to get open, close price given date and ticker name
def close_price(date, ticker, price_data) -> float:
    # Check if the date exists in the ticker's price data
    if date in price_data[ticker].index:
        # Ensure we are getting a scalar value (float or int)
        return price_data[ticker].loc[date, 'Close'].iloc[0]
    else:
        return np.nan  # Return NaN if the date is missing


def open_price(date, ticker, price_data) -> float:
    # Check if the date exists in the ticker's price data
    if date in price_data[ticker].index:
        # Ensure we are getting a scalar value (float or int)
        return price_data[ticker].loc[date, 'Open'].iloc[0] # to avoid errors
    else:
        return np.nan  # Return NaN if the date is missing

def liquidity_adjusted_trade(date, ticker, price_data, max_percentage=0.1):
    """
    Adjust the number of shares to trade based on the available volume and the max trade percentage.
    """
    available_volume = price_data[ticker].loc[date, 'Volume'].iloc[0]

    # Calculate max shares to trade based on max_percentage
    return available_volume * max_percentage



# Backtest Logic

In [None]:
portfolio_value = initial_capital
asset_holdings_history = pd.DataFrame(index=target_directions_df.index, columns=tickers)
amount_invested_history = pd.DataFrame(index=execution_signals_df.index, columns=tickers)
holdings = {ticker: 0 for ticker in tickers}
trade_log = []
cash = initial_capital
portfolio_value_history = []

for date in execution_signals_df.index:
    signal_today = execution_signals_df.loc[date]

    # If no signal today, calculate the portfolio value based on current holdings
    if signal_today.empty:
        portfolio_value = cash
        for ticker in tickers:
            single_asset_value = holdings[ticker] * close_price(date, ticker, price_data)
            single_asset_value_history.loc[date, ticker] = single_asset_value
            portfolio_value += single_asset_value

    else:  # Signal occurred, need to process trades and possible rebalancing
        allocation = 0  # Default allocation to zero
        # Assign equal weights and determine ideal allocation
        if sum(abs(target_directions_df.loc[date])) > 0:
            # Only if there are active positions
            allocation = portfolio_value / sum(abs(target_directions_df.loc[date]))  # Equal weight allocation
        max_alloc = portfolio_value * max_weight

        ideal_invest_amt = min(allocation, max_alloc)  # Consider max weight constraint

        allocations_for_tickers = target_directions_df.loc[date] * ideal_invest_amt

        for ticker in tickers:
            if pd.isna(open_price(date, ticker, price_data)):
                continue

            ticker_current_allocation = holdings[ticker] * open_price(date, ticker, price_data)
            if ticker_current_allocation > allocations_for_tickers[ticker]:  # need to sell
                exec_price = open_price(date, ticker, price_data) * (1 - slippage)  # Sell at a lower price
                # Calculate the target number of shares to align with ideal weight
                target_shares = int(ideal_invest_amt // exec_price)
                shares_to_trade = abs(holdings[ticker] - target_shares)

                if shares_to_trade > min_shares:
                    if shares_to_trade > liquidity_adjusted_trade(date, ticker, price_data): # maximum trade all the quantity available in the market this can occur due to the large initial captial, but very low adjusted price at beginning of the stocks
                        shares_to_trade = liquidity_adjusted_trade(date, ticker, price_data)
                    proceeds = shares_to_trade * exec_price
                    fee = proceeds * commission
                    cash += proceeds
                    cash -= fee
                    holdings[ticker] -= shares_to_trade
                    action = "sell"
                    # Log the rebalance trade
                    trade_log.append({
                        'date': date,
                        'ticker': ticker,
                        'action': action,
                        'shares': shares_to_trade,
                        'price': exec_price,
                        'commission': fee
                    })

            else:  # need to buy
                exec_price = open_price(date, ticker, price_data) * (1 - slippage)  # Buy at a higher price
                target_shares = int(ideal_invest_amt // exec_price)
                shares_to_trade = abs(holdings[ticker] - target_shares)
                if shares_to_trade > min_shares:
                    if shares_to_trade > liquidity_adjusted_trade(date, ticker, price_data):
                        shares_to_trade = liquidity_adjusted_trade(date, ticker, price_data)
                    cost = shares_to_trade * exec_price
                    fee = cost * commission
                    cash -= fee + cost
                    holdings[ticker] += shares_to_trade
                    action = "buy"
                    # Log the rebalance trade
                    trade_log.append({
                        'date': date,
                        'ticker': ticker,
                        'action': action,
                        'shares': shares_to_trade,
                        'price': exec_price,
                        'commission': fee
                    })

            # Record end of day (EOD) values for each ticker
            single_asset_value = holdings[ticker] * close_price(date, ticker, price_data)
            single_asset_value_history.loc[date, ticker] = single_asset_value

        portfolio_value = cash + sum(
            holdings[ticker] * (close_price(date, ticker, price_data) if pd.notna(close_price(date, ticker, price_data)) else 0)
            for ticker in tickers
        )

    portfolio_value_history.append(portfolio_value)  # For tracking portfolio value over time

# Convert portfolio value history to a DataFrame for analysis
portfolio_value_df = pd.DataFrame(portfolio_value_history, columns=['portfolio_value'], index=execution_signals_df.index)


In [None]:
portfolio_value_df

# Performance Metrics
   - Total Return
   - Annual Return
   - Annual Volatility
   - Maximum Drawdown
   - Sharpe Ratio
   - Sortino Ratio
   - Total Number of Trades
   - Average Return per Trade
   - Win Rate
   - Expectancy


In [None]:

def calculate_performance_metrics(portfolio_value_df, trade_log, risk_free_rate=0.02):
    # Calculate the daily returns
    portfolio_value_df['daily_return'] = portfolio_value_df['portfolio_value'].pct_change()

    # Total Return
    total_return = (portfolio_value_df['portfolio_value'].iloc[-1] - portfolio_value_df['portfolio_value'].iloc[0]) / portfolio_value_df['portfolio_value'].iloc[0]

    # Annual Return (assuming 252 trading days per year)
    annual_return = (1 + total_return) ** (1 / len(portfolio_value_df) * 252) - 1

    # Annual Volatility (standard deviation of daily returns, annualized)
    annual_volatility = portfolio_value_df['daily_return'].std() * np.sqrt(252)

    # Maximum Drawdown
    cumulative_returns = (1 + portfolio_value_df['daily_return']).cumprod()
    running_max = cumulative_returns.cummax()
    drawdown = (cumulative_returns - running_max) / running_max
    max_drawdown = drawdown.min()

    # Sharpe Ratio (using the risk-free rate)
    excess_return = portfolio_value_df['daily_return'] - risk_free_rate / 252  # annualized risk-free rate divided by 252
    sharpe_ratio = excess_return.mean() / excess_return.std() * np.sqrt(252)

    # Sortino Ratio (using the downside deviation)
    downside_returns = excess_return[excess_return < 0]
    sortino_ratio = excess_return.mean() / downside_returns.std() * np.sqrt(252)

    # Total Number of Trades
    total_trades = len(trade_log)

    # Average Return per Trade
    total_pnl = sum([trade['pnl'] for trade in trade_log])
    average_return_per_trade = total_pnl / total_trades if total_trades > 0 else 0

    # Win Rate (percentage of profitable trades)
    win_trades = len([trade for trade in trade_log if trade['pnl'] > 0])
    win_rate = win_trades / total_trades if total_trades > 0 else 0

    # Expectancy
    expectancy = (win_rate * average_return_per_trade) - ((1 - win_rate) * average_return_per_trade)

    metrics = {
        'Total Return': total_return,
        'Annual Return': annual_return,
        'Annual Volatility': annual_volatility,
        'Maximum Drawdown': max_drawdown,
        'Sharpe Ratio': sharpe_ratio,
        'Sortino Ratio': sortino_ratio,
        'Total Number of Trades': total_trades,
        'Average Return per Trade': average_return_per_trade,
        'Win Rate': win_rate,
        'Expectancy': expectancy
    }

    return metrics

# Example usage with your portfolio_value_df and trade_log
metrics = calculate_performance_metrics(portfolio_value_df, trade_log)

# Print the results
for metric, value in metrics.items():
    print(f"{metric}: {value:.4f}")


# Answers to Questions

1. Which month did the portfolio have the highest return, and which stock contributed the most to that return?
 - Best Month: Jan 2001, best stock: AAPL

In [None]:
# Calculate the monthly portfolio return
portfolio_value_df['monthly_return'] = portfolio_value_df['portfolio_value'].pct_change().resample('ME').sum()

# Find the month with the highest portfolio return
best_month = portfolio_value_df['monthly_return'].idxmax()
highest_return = portfolio_value_df['monthly_return'].max()

# Calculate the monthly pnl for individual assets
asset_pnl_df = single_asset_value_history.diff().resample('ME').sum()

# Find which stock contributed the most to the highest return month
best_asset_contributor = asset_pnl_df.loc[best_month].idxmax()
highest_pnl = asset_pnl_df.loc[best_month, best_asset_contributor]

# Output results
print(f"Best Month: {best_month}, Portfolio Return: {highest_return}")
print(f"Best Asset: {best_asset_contributor}, Asset PNL: {highest_pnl}")


2. Did the portfolio outperform the S&P 500? If so, what is your rationale for the outperformance?


In [None]:
# Fetch the S&P 500 Total Return data (symbol: ^SP500TR) from Yahoo Finance
sp500_tr = yf.download('^SP500TR', start='1981-01-01', end='2023-12-29', auto_adjust=True)

# Calculate the total return over the period
initial_sp500_value = sp500_tr['Close'].iloc[0]
final_sp500_value = sp500_tr['Close'].iloc[-1]

# Calculate the total return of the S&P 500 Total Return
sp500_return = (final_sp500_value - initial_sp500_value) / initial_sp500_value * 100
print(sp500_return)

my_return = (portfolio_value_df['portfolio_value'].iloc[-1] - portfolio_value_df['portfolio_value'].iloc[0]) / portfolio_value_df['portfolio_value'].iloc[0]
print(my_return)


2. Did the portfolio outperform the S&P 500? If so, what is your rationale for the outperformance?
3. How would you evaluate whether this is a profitable strategy, and what tests would you conduct to assess its robustness?
4. Do you have any suggestions to improve the current strategy?

In [None]:
single_asset_value_history