<a href="https://colab.research.google.com/github/jonzyyyy/AlphaLab/blob/main/AlphaLab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 1. Setup & Libraries

### Setting up necessary keys and env variables
(only required to run at the start of execution)

In [1]:
!pip install python-dotenv --quiet

from google.colab import drive
drive.mount('/content/drive')

# Copy Alpha Lab notebook to folder
!cp "/content/drive/MyDrive/Colab Notebooks/AlphaLab.ipynb" AlphaLab/

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
cp: cannot create regular file 'AlphaLab/': Not a directory


In [2]:
!pip install yfinance --quiet
!pip install vectorbt --quiet
!pip install streamlit pyngrok --quiet

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/527.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m527.8/527.8 kB[0m [31m16.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m315.5/315.5 kB[0m [31m21.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m14.9 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.3/44.3 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m9.9/9.9 MB[0m [31m87.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.9/6.9 MB[0m [31m96.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m79.1/79.1 kB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[?25h

In [3]:
import yfinance as yf
import pandas as pd
import numpy as np
import vectorbt as vbt
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
from pyngrok import ngrok
import os

This block of code below expects a secret that contains the necessary private API keys.

API Keys required:
1. NGROK

In [4]:
# !ngrok config add-authtoken NGROK_API_KEY
from google.colab import userdata
ngrok_token = userdata.get('NGROK_API_KEY')
ngrok.set_auth_token(ngrok_token)



In [5]:
# Parameters
SPLIT_RATIO = 0.7
DATA_YEARS = 2
TOP_N_STOCKS = 5
benchmark_ticker = 'SPY' # Benchmark ticker
SUBSET_SIZE = 25
STARTING_AMOUNT = 10000 # Starting amount of portfolio

# --- Caching Configuration for S&P500 Data ---
DEBUG = True # Set to True to use cached data, False to fetch live data.
CACHE_FILE = 'sp500_market_caps.csv'

# 2. Download S&P 500 Stocks List
For simplicity, we'll fetch the tickers from Wikipedia

In [None]:
# --- Caching Logic ---
if DEBUG and os.path.exists(CACHE_FILE):
    print(f"DEBUG mode is ON. Loading data from cache file: {CACHE_FILE}")
    market_caps_df = pd.read_csv(CACHE_FILE)
else:
    print(f"Fetching live data. DEBUG is OFF or cache file not found.")
    # Get S&P 500 tickers from Wikipedia
    sp500_url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies"
    sp500_table = pd.read_html(sp500_url)
    sp500 = sp500_table[0]
    # Correctly handle potential discrepancies in ticker symbols (e.g., 'BRK.B' vs 'BRK-B')
    tickers = sp500['Symbol'].str.replace('.', '-', regex=False).tolist()

    # We'll store market caps in a list of tuples
    market_caps = []

    print("Downloading market cap data for S&P 500 tickers...")
    for ticker in tickers:
        try:
            info = yf.Ticker(ticker).info
            cap = info.get('marketCap', None)
            if cap is not None:
                market_caps.append((ticker, cap))
        except Exception as e:
            # print(f"Could not fetch data for {ticker}: {e}")
            continue  # skip tickers with issues

    # Convert to DataFrame
    market_caps_df = pd.DataFrame(market_caps, columns=['Ticker', 'MarketCap'])

    # Sort by market cap, descending
    market_caps_df = market_caps_df.sort_values(by='MarketCap', ascending=False).reset_index(drop=True)

    # Save the fetched data to the cache file for future use
    market_caps_df.to_csv(CACHE_FILE, index=False)
    print(f"Live data saved to cache file: {CACHE_FILE}")

# --- Display the result ---
print("\nTop 5 S&P 500 companies by market cap:")
print(market_caps_df.head())

Fetching live data. DEBUG is OFF or cache file not found.
Downloading market cap data for S&P 500 tickers...


In [None]:
print(market_caps_df.head(5))  # Show top 5 largest S&P 500 stocks
# (Optional) Use a smaller subset for a fast demo
# tickers = market_caps_df['Ticker'][:SUBSET_SIZE].tolist()
# print(tickers)

In [None]:
# =======================
# 3. Download Price Data
# =======================
end = datetime.today()
start = end - timedelta(days=365*DATA_YEARS)  # Last 2 years
price_data = yf.download(tickers, start=start, end=end, group_by='ticker', auto_adjust=True, progress=False)
# print(price_data.head(3))

In [None]:
# =======================
# 4. Train-Test Split
# =======================
all_dates = price_data[tickers[0]]['Close'].index
split_idx = int(len(all_dates) * SPLIT_RATIO)
train_dates = all_dates[:split_idx]
test_dates = all_dates[split_idx:]
print(len(all_dates), len(test_dates))

# Helper function to get price for a given ticker and period
def get_close_prices(ticker, dates):
    try:
        close = price_data[ticker]['Close'].reindex(dates)
        return close
    except:
        return pd.Series(index=dates, dtype=float)

# Factor Based Algorithmic Equity Strategy

## Factor Used
1. Trailing P/E ratio - measures a company's current share price against its actual, historical earnings from the previous 12 months

    **Formula**: `Current Share Price / Trailing 12-Month EPS`

    **Limitation**: Past performance is not a guarantee of future results

2. The 6-Month Price Momentum factor is a strategy that aims to capitalize on existing market trends by identifying stocks that have performed well in the recent past

    **Formula**: `(Current Price / Price from 6 Months Ago) - 1`

    **Limitation**: A key risk is a momentum crash or reversal

## To consider
1. Forward P/E ratio - forward-looking metric that measures a company's current share price relative to its estimated future earnings per share (EPS)

    **Formula**: `Current Share Price / Forecasted 12-Month EPS`

    **Limitation**: Its primary weakness is that it's based on estimates, which can be inaccurate, overly optimistic, or change frequently.



In [None]:
class Factor:
    """Base class for financial factors."""
    def __init__(self, name):
        self.name = name

    def calculate(self, price_data_close, tickers):
        """
        Calculates the factor for the given tickers using point-in-time price data.
        Must be implemented by subclasses.
        """
        raise NotImplementedError("Subclass must implement abstract method")

    @staticmethod
    def calculate_factors(price_data_close, subset_tickers, factors):
        """
        Calculates specified factors for a subset of tickers using point-in-time data.

        Args:
            price_data_close (pd.DataFrame): DataFrame of 'Close' prices with tickers as columns.
            subset_tickers (list): List of tickers to calculate factors for.
            factors (list): List of Factor objects to calculate.

        Returns:
            pd.DataFrame: DataFrame with factors as columns and tickers as index.
        """
        factor_data = {}
        valid_tickers_with_data = [
            ticker for ticker in subset_tickers if ticker in price_data_close.columns and not price_data_close[ticker].dropna().empty
        ]
        print(f"Calculating factors for {len(valid_tickers_with_data)} valid tickers...")

        for factor in factors:
            factor_values = []
            # print(f"  Calculating {factor.name}...")
            for ticker in valid_tickers_with_data:
                try:
                    # Pass relevant data to the factor's calculate method
                    value = factor.calculate(price_data_close[[ticker]].copy(), [ticker]) # Pass single ticker DataFrame
                    factor_values.append(value)
                except Exception as e:
                    # print(f"    Error calculating {factor.name} for {ticker}: {e}")
                    factor_values.append(np.nan) # Append NaN if calculation fails

            factor_data[factor.name] = factor_values

        factor_df = pd.DataFrame(factor_data, index=valid_tickers_with_data)
        factor_df = factor_df.dropna() # Drop tickers with NaN in any factor

        # print(f"Factor DataFrame after calculation and dropna:\n{factor_df.head()}")

        return factor_df


class Value(Factor):
    """Value factor based on Trailing P/E ratio."""
    def __init__(self):
        super().__init__('pe_ratio')

    def calculate(self, price_data_close, tickers):
        """
        Calculates the Trailing P/E ratio for a given ticker.

        Args:
            price_data_close (pd.DataFrame): DataFrame of 'Close' prices for a single ticker.
            tickers (list): A list containing the single ticker symbol.

        Returns:
            float: The Trailing P/E ratio or np.nan if unavailable.
        """
        if not tickers or not isinstance(tickers, list) or len(tickers) != 1:
            # print("Value factor requires a single ticker.")
            return np.nan
        ticker = tickers[0]
        try:
            info = yf.Ticker(ticker).info
            pe_ratio = info.get('trailingPE', np.nan)
            # print(f"  PE for {ticker}: {pe_ratio}")
            return pe_ratio
        except Exception as e:
            # print(f"  Error fetching PE for {ticker}: {e}")
            return np.nan


class Momentum(Factor):
    """Momentum factor based on 6-month price return."""
    def __init__(self):
        super().__init__('6m_return')

    def calculate(self, price_data_close, tickers):
        """
        Calculates the 6-month price return for a given ticker.

        Args:
            price_data_close (pd.DataFrame): DataFrame of 'Close' prices for a single ticker.
            tickers (list): A list containing the single ticker symbol.

        Returns:
            float: The 6-month return or np.nan if insufficient data.
        """
        if not tickers or not isinstance(tickers, list) or len(tickers) != 1:
            # print("Momentum factor requires a single ticker.")
            return np.nan
        ticker = tickers[0]
        try:
            # Ensure the price_data_close for the single ticker is a Series
            prices = price_data_close[ticker].dropna()
            # print(f"  Prices length for {ticker}: {len(prices)}") #
            if len(prices) > 126: # Approximately 6 months of trading days
                return_6m = (prices.iloc[-1] / prices.iloc[-126]) - 1
                # print(f"  6m return for {ticker}: {return_6m}") #
                return return_6m
            else:
                return np.nan
        except Exception as e:
            # print(f"  Error calculating 6m return for {ticker}: {e}") #
            return np.nan

### Calculating Initial Factors

The static method `calculate_factors` from the `Factor` class is used to perform the initial factor calculations. The following parameters are passed to the method:
* The relevant price data, specifically the `price_data_close` DataFrame.
* The list of tickers being considered, stored in `subset_tickers`.
* A list of the factor objects that need to be calculated.

**Note on Point-in-Time Data:** This initial calculation uses all available price data. This is not strictly point-in-time accurate as it includes future data relative to the start of the backtest. However, this is done for an initial assessment, and the main backtesting loop later recalculates these factors correctly using only the data available at each specific rebalance point. This step assumes the `price_data_close` and `subset_tickers` variables have been defined previously.

In [None]:
# =======================
# 5. Factor Calculation (Value, Momentum) - Refactored
# =======================

# Filter subset_tickers based on available data in price_data
available_tickers = [ticker for ticker in tickers if ticker in price_data.columns.get_level_values(0)]
missing_tickers = [ticker for ticker in tickers if ticker not in price_data.columns.get_level_values(0)]

if missing_tickers:
    print(f"Warning: Data not available for the following tickers in price_data: {missing_tickers}. Skipping these for factor calculation.")

subset_tickers_filtered = available_tickers
print(f"Filtered subset_tickers: {subset_tickers_filtered}")


# Instantiate the factor classes
value_factor = Value()
momentum_factor = Momentum()

# Create a list of factors to use
factors_to_calculate = [value_factor, momentum_factor]

# Need to make sure price_data_close is aligned with subset_tickers and only contains Close prices
# We can reuse the logic from the backtesting cell for preparing price_data_close
# Note: This part still assumes 'price_data' is available and has the MultiIndex structure from yf.download
if isinstance(price_data.columns, pd.MultiIndex):
     # Select 'Close' for all tickers in subset_tickers_filtered
     price_data_close_all = price_data.loc[:, (subset_tickers_filtered, 'Close')]
     # Drop the 'Price' level to have tickers as single-level columns
     price_data_close_all.columns = price_data_close_all.columns.droplevel('Price')

else: # Handle case where price_data might not have a MultiIndex (e.g., single ticker download)
     # Assuming it's already the close series for a single ticker, convert to DataFrame
     price_data_close_all = pd.DataFrame(price_data)
     # Assuming subset_tickers_filtered has only one ticker in this case
     # This case might need adjustment if subset_tickers_filtered has multiple items but price_data is not MultiIndex
     if len(subset_tickers_filtered) == 1:
         price_data_close_all.columns = [subset_tickers_filtered[0]]
     else:
         print("Warning: price_data is not MultiIndex but subset_tickers_filtered has multiple items.")
         price_data_close_all = price_data[subset_tickers_filtered]


# Now use the refactored calculate_factors method
factor_df = Factor.calculate_factors(price_data_close_all, subset_tickers_filtered, factors_to_calculate)

In [None]:
class StockSelector:
    """Handles composite scoring and stock selection based on factor data."""

    def __init__(self, factor_df, num_stocks_to_select):
        """
        Initializes the StockSelector.

        Args:
            factor_df (pd.DataFrame): DataFrame with factors as columns and tickers as index.
            num_stocks_to_select (int): The number of top stocks to select.
        """
        self.factor_df = factor_df.copy() # Work on a copy to avoid modifying the original
        self.num_stocks_to_select = num_stocks_to_select
        self.selected_tickers = []

    def perform_selection(self):
        """
        Performs composite scoring and selects the top N stocks.

        Returns:
            list: A list of selected ticker symbols.
        """
        if self.factor_df.empty:
            print("Factor DataFrame is empty. Cannot perform selection.")
            return []

        # Ensure 'pe_ratio' and '6m_return' columns exist before ranking
        if 'pe_ratio' in self.factor_df.columns and '6m_return' in self.factor_df.columns:
            # Rank: lower P/E is better, higher momentum is better
            # Handle NaNs in ranking by pushing them to the bottom (worst rank)
            self.factor_df['pe_rank'] = self.factor_df['pe_ratio'].rank(ascending=True, na_option='bottom')
            self.factor_df['mom_rank'] = self.factor_df['6m_return'].rank(ascending=False, na_option='bottom')

            # Simple equal-weight composite score
            # Fill NaNs in ranks with a value larger than any possible rank to ensure they get the worst composite score
            max_rank = self.factor_df.shape[0]
            self.factor_df['composite_score'] = (self.factor_df['pe_rank'].fillna(max_rank + 1) + self.factor_df['mom_rank'].fillna(max_rank + 1)) / 2

            # Select top N stocks
            # Ensure there are enough stocks to select num_stocks_to_select
            if not self.factor_df.empty and self.factor_df.shape[0] >= self.num_stocks_to_select:
                 selected = self.factor_df.nsmallest(self.num_stocks_to_select, 'composite_score')
                 self.selected_tickers = selected.index.tolist()
                 print(f"Selected top {self.num_stocks_to_select} stocks: {self.selected_tickers}")
            elif not self.factor_df.empty:
                 # If fewer than num_stocks_to_select are available, select all available
                 self.selected_tickers = self.factor_df.nsmallest(self.factor_df.shape[0], 'composite_score').index.tolist()
                 print(f"Selected {len(self.selected_tickers)} stocks (fewer than requested {self.num_stocks_to_select}): {self.selected_tickers}")
            else:
                print("No stocks available for selection after scoring.")
                self.selected_tickers = []
        else:
            print("Required factor columns ('pe_ratio', '6m_return') not found in factor_df for scoring.")
            self.selected_tickers = []

        return self.selected_tickers

In [None]:
# =======================
# 6. Composite Scoring & Stock Selection
# =======================

# Instantiate the StockSelector class
stock_selector = StockSelector(factor_df, TOP_N_STOCKS)

# Use the perform_selection method to get the selected tickers
selected_tickers = stock_selector.perform_selection()

# Print the initial selected tickers
print("\nInitial selected stocks based on all data:", selected_tickers)

# Rebalancing and Backtesting strategy
Based on your code, here is a high-level explanation of how the rebalancing and backtesting strategy works. This process is designed to realistically simulate how an active investment strategy would be managed over time.

### **High-Level Strategy Explanation**

The strategy operates like a dynamic fund manager who periodically re-evaluates the market and adjusts their portfolio. It does this by looping through time in quarterly intervals and making investment decisions based **only on the information available at each point in time**.

Here is the step-by-step process:

1.  **Preparation**: The backtest first defines a "test period" (e.g., the last 30% of your historical data) and identifies the start date of each quarter within that period. These dates become the scheduled "rebalancing days."

2.  **The Simulation Loop**: The code then simulates the passing of time by looping through each rebalancing day. At each of these dates, it performs the following actions:
    * **Point-in-Time Analysis**: It looks backward, using only the historical data available up to that specific day to re-calculate the Value and Momentum factors for all stocks.
    * **Portfolio Re-Selection**: Based on these fresh, point-in-time factor scores, it re-ranks the stocks and selects the new "Top N" stocks to form the portfolio for the upcoming quarter.
    * **Holding Period**: The strategy then "holds" this newly selected portfolio until the next rebalancing day, recording its daily performance during this period.

3.  **Performance Calculation**: After the loop has run through all the quarters, the daily returns from each individual holding period are "stitched" together into one continuous performance history for the entire strategy.

4.  **Final Analysis**: This complete and stitched-together series of returns is used to calculate the final cumulative return and overall performance metrics like Annualized Return, Volatility, and the Sharpe Ratio.

This point-in-time rebalancing approach is a robust method because it avoids **lookahead bias**, ensuring that investment decisions are never made using information that would not have been available at the time.

## Future implementations

Metrics
1. Add Maximum Drawdown
2. Add Volatility
3. Alpha
4. Calmar Ratio (`Annualized Return / Max Drawdown`)
5. Max drawdown
6. Portfolio Turnover

Additional points
1. Accounting for transaction fees, slippage
2. Address survivorship bias

In [None]:
class Backtester:
    """
    Encapsulates the backtesting logic for a factor-based strategy.
    """
    def __init__(self, price_data_close, subset_tickers, split_ratio, top_n_stocks, rebalance_freq='QS'):
        """
        Initializes the Backtester.

        Args:
            price_data_close (pd.DataFrame): DataFrame of 'Close' prices with tickers as columns.
            subset_tickers (list): List of tickers to consider for selection.
            split_ratio (float): Ratio of data to use for training (defines the start of the test period).
            top_n_stocks (int): The number of top stocks to select at each rebalance.
            rebalance_freq (str): Frequency string for rebalancing (e.g., 'QS' for quarterly).
        """
        self.price_data_close = price_data_close.copy()
        self.subset_tickers = subset_tickers
        self.split_ratio = split_ratio
        self.top_n_stocks = top_n_stocks
        self.rebalance_freq = rebalance_freq
        self.starting_amount = 1 # default starting portfolio value is $1
        self.rebalance_dates = None
        self.selected_tickers = None

        # Initialize attributes to store results
        self.portfolio_returns = None
        self.cum_returns = None
        self.total_return = None
        self.annualized_return = None
        self.annualized_vol = None
        self.sharpe_ratio = None

    def _get_test_period_dates(self):
        """
        Identifies and returns the test period dates based on the split ratio.

        Returns:
            pd.DatetimeIndex: A DatetimeIndex containing the dates for the test period.
        """
        all_dates = self.price_data_close.index
        split_idx = int(len(all_dates) * self.split_ratio)
        test_dates = all_dates[split_idx:]
        return test_dates


    def _calculate_rebalance_dates(self, test_dates):
        """
        Calculates the dates on which to rebalance the portfolio.

        Args:
            test_dates (pd.DatetimeIndex): The dates for the test period.

        Returns:
            pd.DatetimeIndex: A DatetimeIndex containing the rebalance dates.
        """
        if test_dates.empty:
             print("Warning: Test period is empty. Adjust split ratio or data years.")
             return pd.DatetimeIndex([]) # Return an empty DatetimeIndex


        test_start_date, test_end_date = test_dates[0], test_dates[-1]
        rebalance_dates = pd.date_range(start=test_start_date, end=test_end_date, freq=self.rebalance_freq)

        # Ensure the first day of the test period is included if it's not a quarter start
        if test_start_date not in rebalance_dates:
            rebalance_dates = rebalance_dates.insert(0, test_start_date)
        return rebalance_dates


    def _perform_point_in_time_analysis(self, current_date):
        """
        Extracts point-in-time data, calculates factors, and selects top stocks for a given date.

        Args:
            current_date (pd.Timestamp): The date for which to perform the analysis.

        Returns:
            list: A list of selected ticker symbols for the current period, or an empty list if selection fails.
        """
        print(f"\nRebalancing for period starting {current_date.date()}...")
        point_in_time_data = self.price_data_close[self.price_data_close.index <= current_date]

        if point_in_time_data.empty:
            print("  -> Skipping: Not enough historical data to calculate factors.")
            return []

        print(f"Analysing point in time data from {point_in_time_data.index[0].date()} to {point_in_time_data.index[-1].date()}")

        # Recalculate factors using only point-in-time data
        value_factor = Value() # Assuming Value and Momentum classes are available
        momentum_factor = Momentum()
        factors_to_calculate = [Value(), Momentum()]

        # Use the calculate_factors static method from the Factor class
        factor_df_point_in_time = Factor.calculate_factors(point_in_time_data, self.subset_tickers, factors_to_calculate)


        if factor_df_point_in_time.empty:
            print("  -> Skipping: No valid tickers after factor calculation.")
            return []

        # Perform Composite Scoring and Select Top N stocks based on the point-in-time factor_df
        stock_selector_point_in_time = StockSelector(factor_df_point_in_time, self.top_n_stocks)
        current_period_selected_tickers = stock_selector_point_in_time.perform_selection()

        if not current_period_selected_tickers:
             print(f"  -> No stocks selected for period starting {current_date.date()}.")
             return [] # No stocks selected for this period
        else:
            print(f"  -> New portfolio selected for period starting {current_date.date()}: {current_period_selected_tickers}")
            return current_period_selected_tickers


    def _calculate_holding_period_returns(self, start_date, end_date, selected_tickers):
        """
        Calculates the returns for a given holding period and selected tickers.

        Args:
            start_date (pd.Timestamp): The start date of the holding period.
            end_date (pd.Timestamp): The end date of the holding period.
            selected_tickers (list): A list of ticker symbols held during the period.

        Returns:
            pd.Series: A time series of equal-weighted daily returns for the holding period, or an empty Series if no data.
        """
        if not selected_tickers:
            return pd.Series()

        holding_period_mask = (self.price_data_close.index >= start_date) & (self.price_data_close.index < end_date)
        holding_period_prices = self.price_data_close[holding_period_mask]

        if not holding_period_prices.empty and selected_tickers:
            valid_tickers_in_holding_period = [t for t in selected_tickers if t in holding_period_prices.columns]
            if valid_tickers_in_holding_period:
                period_prices_selected = holding_period_prices[valid_tickers_in_holding_period]
                period_returns = period_prices_selected.pct_change().fillna(0)
                # Assume equal weight for the period
                equal_weight_returns = period_returns.mean(axis=1)
                return equal_weight_returns
            else:
                print(f"  -> No valid selected tickers found in price data for holding period starting {start_date.date()}.")
                return pd.Series(index=holding_period_prices.index) # Return empty series with correct index
        else:
            print(f"  -> No data for holding period starting {start_date.date()}.")
            return pd.Series() # Return empty series


    def _concatenate_and_process_returns(self, all_period_returns, test_dates):
        """
        Concatenates all period returns, calculates cumulative returns, reindexes, and forward fills.

        Args:
            all_period_returns (list): A list of pandas Series, each containing returns for a holding period.
            test_dates (pd.DatetimeIndex): The full range of test period dates.
        """
        if all_period_returns:
            self.portfolio_returns = pd.concat(all_period_returns).sort_index()
            self.portfolio_returns = self.portfolio_returns[~self.portfolio_returns.index.duplicated(keep='first')]

            # 5. Calculate final cumulative returns
            self.cum_returns = (1 + self.portfolio_returns).cumprod()

            # Prepend a starting value of 1 at the very beginning of the test period
            actual_start_date = self.portfolio_returns.index[0] if not self.portfolio_returns.empty else test_dates[0]
            start_value_series = pd.Series([1.0], index=[actual_start_date - pd.Timedelta(days=1)])
            self.cum_returns = pd.concat([start_value_series, self.cum_returns])

            # Reindex to the full test_dates and forward fill
            self.cum_returns = self.cum_returns.reindex(test_dates)
            self.cum_returns = self.cum_returns.ffill()

            print("\n--- Backtest Returns Processed ---")
        else:
            print("\nBacktest could not be completed. No returns were generated.")
            self.portfolio_returns = pd.Series(index=test_dates if test_dates else None)
            self.cum_returns = pd.Series(index=test_dates if test_dates else None)


    def run_backtest(self):
        """
        Runs the point-in-time backtesting loop with quarterly rebalancing.
        """
        test_dates = self._get_test_period_dates()
        rebalance_dates = self._calculate_rebalance_dates(test_dates)

        if rebalance_dates.empty: # Check for empty DatetimeIndex
            print("No rebalance dates available")
            # Ensure returns are initialized if no rebalance dates
            self.portfolio_returns = pd.Series(index=test_dates)
            self.cum_returns = pd.Series(index=test_dates)
            return

        # Loop through rebalance dates to simulate the strategy
        all_period_returns = []
        last_rebalance_date = None
        last_selected_tickers = []

        print(f"Starting point-in-time backtest with {len(rebalance_dates)} rebalance periods...")

        for i, rebalance_date in enumerate(rebalance_dates):
            if last_rebalance_date is not None and last_selected_tickers:
                # Calculate returns for the previously selected portfolio during the last holding period
                holding_period_returns = self._calculate_holding_period_returns(last_rebalance_date, rebalance_date, last_selected_tickers)
                if not holding_period_returns.empty:
                    all_period_returns.append(holding_period_returns)


            # --- Point-in-Time Analysis for the current rebalance_date ---
            current_period_selected_tickers = self._perform_point_in_time_analysis(rebalance_date)

            last_selected_tickers = current_period_selected_tickers
            last_rebalance_date = rebalance_date

        # After the loop, process the returns from the last holding period
        if last_rebalance_date is not None and last_selected_tickers:
            final_holding_period_returns = self._calculate_holding_period_returns(last_rebalance_date, test_dates[-1] + pd.Timedelta(days=1), last_selected_tickers) # Add a day to include the last day
            if not final_holding_period_returns.empty:
                all_period_returns.append(final_holding_period_returns)

        # Concatenate and process all collected returns
        self._concatenate_and_process_returns(all_period_returns, test_dates)


    def calculate_performance_metrics(self):
        """
        Calculates and stores key performance metrics.
        Assumes run_backtest has been called and self.portfolio_returns and self.cum_returns are populated.
        """
        if self.portfolio_returns is None or self.portfolio_returns.empty or len(self.cum_returns) <= 1:
            print("\nNot enough data to calculate performance metrics.")
            self.total_return = np.nan
            self.annualized_return = np.nan
            self.annualized_vol = np.nan
            self.sharpe_ratio = np.nan
            return

        # Ensure metrics are calculated on the non-NaN parts of the cumulative returns series
        if not self.cum_returns.dropna().empty and len(self.cum_returns.dropna()) > 1:
            self.total_return = self.cum_returns.dropna().iloc[-1] - 1

            # Calculate duration based on the actual number of trading days in the portfolio_returns series
            duration_trading_days = len(self.portfolio_returns)
            annualization_factor = 252 / duration_trading_days if duration_trading_days > 0 else 0

            if annualization_factor > 0:
                 self.annualized_return = (1 + self.total_return)**annualization_factor - 1
                 self.annualized_vol = self.portfolio_returns.std() * np.sqrt(252) # Volatility is based on daily returns
                 self.sharpe_ratio = self.annualized_return / self.annualized_vol if self.annualized_vol != 0 else 0.0
            else:
                 # Handle case with no trading days in returns
                 self.annualized_return = np.nan
                 self.annualized_vol = np.nan
                 self.sharpe_ratio = np.nan
                 print("\nNot enough trading days in the backtest period to calculate annualized metrics.")

        else:
            print("\nNot enough data points to calculate performance metrics.")
            self.total_return = np.nan
            self.annualized_return = np.nan
            self.annualized_vol = np.nan
            self.sharpe_ratio = np.nan

    def set_starting_amount(self, starting_amount):
        self.starting_amount = starting_amount

    def display_performance_metrics(self, starting_amount=None):
        """
        Displays the calculated performance metrics, scaled by a starting amount.
        Assumes calculate_performance_metrics has been called.
        """
        # Use the instance's starting_amount if not provided in the method call
        display_amount = starting_amount if starting_amount is not None else self.starting_amount

        print(f"\n--- Performance Metrics (Starting Amount: ${display_amount}) ---")
        # Recalculate metrics if they are not already calculated or are NaN
        if any(pd.isna([self.total_return, self.annualized_return, self.annualized_vol, self.sharpe_ratio])):
             self.calculate_performance_metrics()

        if not pd.isna(self.total_return):
            print(f"Total Return (Test): {self.total_return:.2%}")
            final_value = display_amount * (1 + self.total_return)
            print(f"Final Portfolio Value: ${final_value:.2f}")
        if not pd.isna(self.annualized_return):
            print(f"Annualized Return (Test): {self.annualized_return:.2%}")
        if not pd.isna(self.annualized_vol):
            print(f"Annualized Volatility (Test): {self.annualized_vol:.2%}")
        if not pd.isna(self.sharpe_ratio):
            print(f"Sharpe Ratio (Test): {self.sharpe_ratio:.2f}")
        if any(pd.isna([self.total_return, self.annualized_return, self.annualized_vol, self.sharpe_ratio])):
             print("Performance metrics could not be calculated.")


    def plot_cum_returns(self):
        if self.cum_returns is not None and not self.cum_returns.empty:
            plt.figure(figsize=(12, 6))
            # Scale the cumulative returns by the starting amount for plotting
            (self.cum_returns * self.starting_amount).plot()
            plt.title(f'Point-in-Time Strategy Cumulative Returns (Starting Amount: ${self.starting_amount})')
            plt.ylabel(f'Growth of ${self.starting_amount}')
            plt.grid()
            plt.show()
        else:
            print("\nNo cumulative returns data generated by the Backtester to plot.")

In [None]:
# Modify the existing backtesting cell to create an instance of the Backtester class
# and call its methods to run the backtest.

# Instantiate the Backtester class, passing the necessary data and parameters
backtester = Backtester(
    price_data_close=price_data_close_all, # Use the price_data_close_all DataFrame with single-level columns
    subset_tickers=tickers, # Use the 'tickers' variable which holds the subset
    split_ratio=SPLIT_RATIO,
    top_n_stocks=TOP_N_STOCKS
)

# Call the run_backtest method on the backtester instance
backtester.set_starting_amount(STARTING_AMOUNT)
backtester.run_backtest()

# Call the calculate_performance_metrics method on the backtester instance
backtester.display_performance_metrics()

# Access and display the results and metrics stored as attributes of the backtester instance
backtester.plot_cum_returns()



In [None]:
# =======================
# 8. Results Visualization (with Benchmark)
# =======================

# 1. Download benchmark data for the test period
# Ensure the test_dates are correctly defined, possibly from the backtester object
# For now, let's use the index from the backtester's cumulative returns which should represent the test period
if backtester.cum_returns is not None and not backtester.cum_returns.empty:
    test_period_start = backtester.cum_returns.index[0]
    test_period_end = backtester.cum_returns.index[-1]
else:
    print("Backtester cumulative returns are not available to define the benchmark period.")
    # Fallback to original test_dates logic if backtester results are not ready
    all_dates = price_data_close_all.index
    split_idx = int(len(all_dates) * SPLIT_RATIO)
    test_dates = all_dates[split_idx:]
    if not test_dates.empty:
        test_period_start = test_dates[0]
        test_period_end = test_dates[-1]
    else:
        print("Cannot determine benchmark period.")
        test_period_start = None
        test_period_end = None


if test_period_start is not None and test_period_end is not None:
    benchmark_ticker_data = yf.download(benchmark_ticker, start=test_period_start, end=test_period_end, auto_adjust=True, progress=False)
    if not benchmark_ticker_data.empty:
        benchmark_ticker_close_prices = benchmark_ticker_data['Close']

        # 2. Calculate SPY's daily returns and cumulative returns
        benchmark_returns = benchmark_ticker_close_prices.pct_change().fillna(0)
        benchmark_cum_returns = (1 + benchmark_returns).cumprod()

        # Ensure the starting value of benchmark is 1.0 for comparison
        if not benchmark_cum_returns.empty:
             benchmark_cum_returns = benchmark_cum_returns / benchmark_cum_returns.iloc[0]

        # Align the two time series to a common index
        # Use the cum_returns from the backtester instance
        cum_returns_aligned = backtester.cum_returns.reindex(benchmark_cum_returns.index.union(backtester.cum_returns.index))
        benchmark_cum_returns_aligned = benchmark_cum_returns.reindex(benchmark_cum_returns.index.union(backtester.cum_returns.index))
        # print(cum_returns_aligned.head())
        # print(benchmark_cum_returns_aligned.head())

        # Forward fill to handle missing dates, ensuring alignment
        cum_returns_aligned = cum_returns_aligned.ffill()
        benchmark_cum_returns_aligned = benchmark_cum_returns_aligned.ffill()

        # 3. Plot both on the same graph
        plt.figure(figsize=(12, 6))

        # Plot your portfolio
        plt.plot(cum_returns_aligned.index, cum_returns_aligned.values,
                 label='My Portfolio', linewidth=2)

        # Plot the benchmark
        plt.plot(benchmark_cum_returns_aligned.index, benchmark_cum_returns_aligned.values,
                 linestyle='--', label=benchmark_ticker, linewidth=2)

        # Annotate final values
        # Ensure there are non-NaN values before attempting to get the last date/value
        if not cum_returns_aligned.dropna().empty:
             last_portfolio_date = cum_returns_aligned.dropna().index[-1]
             last_portfolio_value = cum_returns_aligned.dropna().iloc[-1]
             plt.text(last_portfolio_date, last_portfolio_value,
                     f"{last_portfolio_value:.5}",
                     color='C0', fontsize=12, va='center', ha='left', fontweight='bold')

        if not benchmark_cum_returns_aligned.dropna().empty:
            last_benchmark_date = benchmark_cum_returns_aligned.dropna().index[-1]
            last_benchmark_value = benchmark_cum_returns_aligned.dropna().iloc[-1]
            # Handle potential Series vs scalar if only one benchmark column
            if isinstance(last_benchmark_value, pd.Series):
                 last_benchmark_value = last_benchmark_value.iloc[0] # Access the scalar value

            plt.text(last_benchmark_date, last_benchmark_value,
                     f"{last_benchmark_value:.5f}",
                     color='C1', fontsize=12, va='center', ha='left', fontweight='bold')


        plt.title(f'Cumulative Return: My Portfolio vs {benchmark_ticker}') # Update title
        plt.xlabel('Date')
        plt.ylabel('Growth of $1')
        plt.legend()
        plt.grid(True)
        plt.tight_layout()
        plt.show()
    else:
        print("Benchmark data download failed.")
else:
    print("Cannot plot benchmark due to undefined period.")

# Saves entire file to app.py

In [None]:
%%writefile app.py
# This entire cell will be saved as app.py


import streamlit as st
import plotly.express as px
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import yfinance as yf
import time # Import time for simulating progress
import os # Import os for checking file existence


# Define functions (copied from the Colab notebook)

# --- Caching Configuration for S&P500 Data ---
CACHE_FILE = 'sp500_market_caps.csv'


# --- Helper function to download S&P 500 tickers ---
def download_sp500_tickers(debug=True):
    """
    Downloads S&P 500 tickers from Wikipedia and fetches market caps.
    Caches the result to a CSV file.
    """
    if debug and os.path.exists(CACHE_FILE):
        # print(f"DEBUG mode is ON. Loading data from cache file: {CACHE_FILE}") # Moved print to streamlit logic
        try:
            market_caps_df = pd.read_csv(CACHE_FILE)
            return market_caps_df
        except Exception as e:
            st.warning(f"Error loading cache file {CACHE_FILE}: {e}. Attempting to fetch live data.")
            # Fall through to live data fetching if cache loading fails


    # print(f"Fetching live data. DEBUG is OFF or cache file not found.") # Moved print to streamlit logic
    # Get S&P 500 tickers from Wikipedia
    try:
        sp500_url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies"
        sp500_table = pd.read_html(sp500_url)
        sp500 = sp500_table[0]
        # Correctly handle potential discrepancies in ticker symbols (e.g., 'BRK.B' vs 'BRK-B')
        tickers = sp500['Symbol'].str.replace('.', '-', regex=False).tolist()

        # We'll store market caps in a list of tuples
        market_caps = []

        # print("Downloading market cap data for S&P 500 tickers...") # Moved print to streamlit logic
        # Use a progress bar or spinner in Streamlit for this part
        for ticker in tickers:
            try:
                info = yf.Ticker(ticker).info
                cap = info.get('marketCap', None)
                if cap is not None:
                    market_caps.append((ticker, cap))
            except Exception as e:
                # print(f"Could not fetch data for {ticker}: {e}") # Keep silent for individual ticker errors
                continue  # skip tickers with issues

        # Convert to DataFrame
        market_caps_df = pd.DataFrame(market_caps, columns=['Ticker', 'MarketCap'])

        if market_caps_df.empty:
            st.error("Failed to fetch any S&P 500 market cap data.")
            return pd.DataFrame(columns=['Ticker', 'MarketCap'])

        # Sort by market cap, descending
        market_caps_df = market_caps_df.sort_values(by='MarketCap', ascending=False).reset_index(drop=True)

        # Save the fetched data to the cache file for future use
        try:
            market_caps_df.to_csv(CACHE_FILE, index=False)
            # print(f"Live data saved to cache file: {CACHE_FILE}") # Moved print to streamlit logic
        except Exception as e:
            st.warning(f"Could not save market cap data to cache file {CACHE_FILE}: {e}")


        return market_caps_df

    except Exception as e:
        st.error(f"Error fetching S&P 500 tickers from Wikipedia: {e}")
        return pd.DataFrame(columns=['Ticker', 'MarketCap'])


# --- Helper function to download price data ---
def download_price_data(tickers, start_date, end_date):
    """Downloads historical price data for a list of tickers."""
    try:
        # Use a progress bar or spinner in Streamlit for this part
        price_data = yf.download(tickers, start=start_date, end=end_date, group_by='ticker', auto_adjust=True, progress=False)
        if price_data.empty:
            st.warning(f"No price data downloaded for tickers: {tickers}")
            return pd.DataFrame() # Return empty DataFrame
        return price_data
    except Exception as e:
        st.error(f"Error downloading price data for tickers {tickers}: {e}")
        return pd.DataFrame() # Return empty DataFrame


# =======================
# 5. Factor Calculation (Value, Momentum) - Refactored
# =======================

class Factor:
    """Base class for financial factors."""
    def __init__(self, name):
        self.name = name

    def calculate(self, price_data_close, tickers):
        """
        Calculates the factor for the given tickers using point-in-time price data.
        Must be implemented by subclasses.
        """
        raise NotImplementedError("Subclass must implement abstract method")

    @staticmethod
    def calculate_factors(price_data_close, subset_tickers, factors):
        """
        Calculates specified factors for a subset of tickers using point-in-time data.

        Args:
            price_data_close (pd.DataFrame): DataFrame of 'Close' prices with tickers as columns.
            subset_tickers (list): List of tickers to calculate factors for.
            factors (list): List of Factor objects to calculate.

        Returns:
            pd.DataFrame: DataFrame with factors as columns and tickers as index.
        """
        factor_data = {}
        valid_tickers_with_data = [
            ticker for ticker in subset_tickers if ticker in price_data_close.columns and not price_data_close[ticker].dropna().empty
        ]
        # print(f"Calculating factors for {len(valid_tickers_with_data)} valid tickers...") # Moved to streamlit logic

        for factor in factors:
            factor_values = []
            # print(f"  Calculating {factor.name}...") # Moved to streamlit logic
            for ticker in valid_tickers_with_data:
                try:
                    # Pass relevant data to the factor's calculate method
                    value = factor.calculate(price_data_close[[ticker]].copy(), [ticker]) # Pass single ticker DataFrame
                    factor_values.append(value)
                except Exception as e:
                    # print(f"    Error calculating {factor.name} for {ticker}: {e}") # Keep silent for individual ticker errors
                    factor_values.append(np.nan) # Append NaN if calculation fails

            factor_data[factor.name] = factor_values

        factor_df = pd.DataFrame(factor_data, index=valid_tickers_with_data)
        factor_df = factor_df.dropna() # Drop tickers with NaN in any factor

        # print(f"Factor DataFrame after calculation and dropna:\n{factor_df.head()}") # Moved to streamlit logic

        return factor_df


class Value(Factor):
    """Value factor based on Trailing P/E ratio."""
    def __init__(self):
        super().__init__('pe_ratio')

    def calculate(self, price_data_close, tickers):
        """
        Calculates the Trailing P/E ratio for a given ticker.

        Args:
            price_data_close (pd.DataFrame): DataFrame of 'Close' prices for a single ticker.
            tickers (list): A list containing the single ticker symbol.

        Returns:
            float: The Trailing P/E ratio or np.nan if unavailable.
        """
        if not tickers or not isinstance(tickers, list) or len(tickers) != 1:
            # print("Value factor requires a single ticker.") # Moved print to streamlit logic
            return np.nan
        ticker = tickers[0]
        try:
            info = yf.Ticker(ticker).info
            pe_ratio = info.get('trailingPE', np.nan)
            # print(f"  PE for {ticker}: {pe_ratio}") # Moved print to streamlit logic
            return pe_ratio
        except Exception as e:
            # print(f"  Error fetching PE for {ticker}: {e}") # Keep silent for individual ticker errors
            return np.nan


class Momentum(Factor):
    """Momentum factor based on 6-month price return."""
    def __init__(self):
        super().__init__('6m_return')

    def calculate(self, price_data_close, tickers):
        """
        Calculates the 6-month price return for a given ticker.

        Args:
            price_data_close (pd.DataFrame): DataFrame of 'Close' prices for a single ticker.
            tickers (list): A list containing the single ticker symbol.


        Returns:
            float: The 6-month return or np.nan if insufficient data.
        """
        if not tickers or not isinstance(tickers, list) or len(tickers) != 1:
            # print("Momentum factor requires a single ticker.") # Moved print to streamlit logic
            return np.nan
        ticker = tickers[0]
        try:
            # Ensure the price_data_close for the single ticker is a Series
            prices = price_data_close[ticker].dropna()
            # print(f"  Prices length for {ticker}: {len(prices)}") # Moved to streamlit logic
            if len(prices) > 126: # Approximately 6 months of trading days
                return_6m = (prices.iloc[-1] / prices.iloc[-126]) - 1
                # print(f"  6m return for {ticker}: {return_6m}") # Moved to streamlit logic
                return return_6m
            else:
                return np.nan
        except Exception as e:
            # print(f"  Error calculating 6m return for {ticker}: {e}") # Keep silent for individual ticker errors
            return np.nan

# =======================
# 6. Composite Scoring & Stock Selection
# =======================

class StockSelector:
    """Handles composite scoring and stock selection based on factor data."""

    def __init__(self, factor_df, num_stocks_to_select):
        """
        Initializes the StockSelector.

        Args:
            factor_df (pd.DataFrame): DataFrame with factors as columns and tickers as index.
            num_stocks_to_select (int): The number of top stocks to select.
        """
        self.factor_df = factor_df.copy() # Work on a copy to avoid modifying the original
        self.num_stocks_to_select = num_stocks_to_select
        self.selected_tickers = []

    def perform_selection(self):
        """
        Performs composite scoring and selects the top N stocks.

        Returns:
            list: A list of selected ticker symbols.
        """
        if self.factor_df.empty:
            # print("Factor DataFrame is empty. Cannot perform selection.") # Moved print to streamlit logic
            return []

        # Ensure 'pe_ratio' and '6m_return' columns exist before ranking
        if 'pe_ratio' in self.factor_df.columns and '6m_return' in self.factor_df.columns:
            # Rank: lower P/E is better, higher momentum is better
            # Handle NaNs in ranking by pushing them to the bottom (worst rank)
            self.factor_df['pe_rank'] = self.factor_df['pe_ratio'].rank(ascending=True, na_option='bottom')
            self.factor_df['mom_rank'] = self.factor_df['6m_return'].rank(ascending=False, na_option='bottom')

            # Simple equal-weight composite score
            # Fill NaNs in ranks with a value larger than any possible rank to ensure they get the worst composite score
            max_rank = self.factor_df.shape[0]
            self.factor_df['composite_score'] = (self.factor_df['pe_rank'].fillna(max_rank + 1) + self.factor_df['mom_rank'].fillna(max_rank + 1)) / 2

            # Select top N stocks
            # Ensure there are enough stocks to select num_stocks_to_select
            if not self.factor_df.empty and self.factor_df.shape[0] >= self.num_stocks_to_select:
                 selected = self.factor_df.nsmallest(self.num_stocks_to_select, 'composite_score')
                 self.selected_tickers = selected.index.tolist()
                 # print(f"Selected top {self.num_stocks_to_select} stocks: {self.selected_tickers}") # Moved print to streamlit logic
            elif not self.factor_df.empty:
                 # If fewer than num_stocks_to_select are available, select all available
                 self.selected_tickers = self.factor_df.nsmallest(self.factor_df.shape[0], 'composite_score').index.tolist()
                 # print(f"Selected {len(self.selected_tickers)} stocks (fewer than requested {self.num_stocks_to_select}): {self.selected_tickers}") # Moved print to streamlit logic
            else:
                # print("No stocks available for selection after scoring.") # Moved print to streamlit logic
                self.selected_tickers = []
        else:
            # print("Required factor columns ('pe_ratio', '6m_return') not found in factor_df for scoring.") # Moved print to streamlit logic
            self.selected_tickers = []

        return self.selected_tickers

# =======================
# 7. Backtesting (Refactored into Class)
# =======================
class Backtester:
    """
    Encapsulates the backtesting logic for a factor-based strategy.
    """
    def __init__(self, price_data_close, subset_tickers, split_ratio, top_n_stocks, rebalance_freq='QS'):
        """
        Splits data, calculates rebalance dates.

        Args:
            price_data_close (pd.DataFrame): DataFrame of 'Close' prices with tickers as columns.
            subset_tickers (list): List of tickers to consider for selection.
            split_ratio (float): Ratio of data to use for training (defines the start of the test period).
            top_n_stocks (int): The number of top stocks to select at each rebalance.
            rebalance_freq (str): Frequency string for rebalancing (e.g., 'QS' for quarterly).
        """
        self.price_data_close = price_data_close.copy()
        self.subset_tickers = subset_tickers
        self.split_ratio = split_ratio
        self.top_n_stocks = top_n_stocks
        self.rebalance_freq = rebalance_freq
        self.starting_amount = 1 # default starting portfolio value is $1
        self.rebalance_dates = None
        self.selected_tickers = None

        # Initialize attributes to store results
        self.portfolio_returns = None
        self.cum_returns = None
        self.total_return = None
        self.annualized_return = None
        self.annualized_vol = None
        self.sharpe_ratio = None

    def _get_test_period_dates(self):
        """
        Identifies and returns the test period dates based on the split ratio.

        Returns:
            pd.DatetimeIndex: A DatetimeIndex containing the dates for the test period.
        """
        all_dates = self.price_data_close.index
        split_idx = int(len(all_dates) * self.split_ratio)
        test_dates = all_dates[split_idx:]
        return test_dates


    def _calculate_rebalance_dates(self, test_dates):
        """
        Calculates the dates on which to rebalance the portfolio.

        Args:
            test_dates (pd.DatetimeIndex): The dates for the test period.

        Returns:
            pd.DatetimeIndex: A DatetimeIndex containing the rebalance dates.
        """
        if test_dates.empty:
             # print("Warning: Test period is empty. Adjust split ratio or data years.") # Moved print to streamlit logic
             return pd.DatetimeIndex([]) # Return an empty DatetimeIndex


        test_start_date, test_end_date = test_dates[0], test_dates[-1]
        rebalance_dates = pd.date_range(start=test_start_date, end=test_end_date, freq=self.rebalance_freq)

        # Ensure the first day of the test period is included if it's not a quarter start
        if test_start_date not in rebalance_dates:
            rebalance_dates = rebalance_dates.insert(0, test_start_date)
        return rebalance_dates


    def _perform_point_in_time_analysis(self, current_date):
        """
        Extracts point-in-time data, calculates factors, and selects top stocks for a given date.

        Args:
            current_date (pd.Timestamp): The date for which to perform the analysis.

        Returns:
            list: A list of selected ticker symbols for the current period, or an empty list if selection fails.
        """
        # print(f"\nRebalancing for period starting {current_date.date()}...") # Moved print to streamlit logic
        point_in_time_data = self.price_data_close[self.price_data_close.index <= current_date]

        if point_in_time_data.empty:
            # print("  -> Skipping: Not enough historical data to calculate factors.") # Moved print to streamlit logic
            return []

        # print(f"Analysing point in time data from {point_in_time_data.index[0].date()} to {point_in_time_data.index[-1].date()}") # Moved print to streamlit logic

        # Recalculate factors using only point-in-time data
        value_factor = Value() # Assuming Value and Momentum classes are available
        momentum_factor = Momentum()
        factors_to_calculate = [Value(), Momentum()]

        # Use the calculate_factors static method from the Factor class
        factor_df_point_in_time = Factor.calculate_factors(point_in_time_data, self.subset_tickers, factors_to_calculate)


        if factor_df_point_in_time.empty:
            # print("  -> Skipping: No valid tickers after factor calculation.") # Moved print to streamlit logic
            return []

        # Perform Composite Scoring and Select Top N stocks based on the point-in-time factor_df
        stock_selector_point_in_time = StockSelector(factor_df_point_in_time, self.top_n_stocks)
        current_period_selected_tickers = stock_selector_point_in_time.perform_selection()

        if not current_period_selected_tickers:
             # print(f"  -> No stocks selected for period starting {current_date.date()}.") # Moved print to streamlit logic
             return [] # No stocks selected for this period
        else:
            # print(f"  -> New portfolio selected for period starting {current_date.date()}: {current_period_selected_tickers}") # Moved print to streamlit logic
            return current_period_selected_tickers


    def _calculate_holding_period_returns(self, start_date, end_date, selected_tickers):
        """
        Calculates the returns for a given holding period and selected tickers.

        Args:
            start_date (pd.Timestamp): The start date of the holding period.
            end_date (pd.Timestamp): The end date of the holding period.
            selected_tickers (list): A list of ticker symbols held during the period.

        Returns:
            pd.Series: A time series of equal-weighted daily returns for the holding period, or an empty Series if no data.
        """
        if not selected_tickers:
            return pd.Series()

        holding_period_mask = (self.price_data_close.index >= start_date) & (self.price_data_close.index < end_date)
        holding_period_prices = self.price_data_close[holding_period_mask]

        if not holding_period_prices.empty and selected_tickers:
            valid_tickers_in_holding_period = [t for t in selected_tickers if t in holding_period_prices.columns]
            if valid_tickers_in_holding_period:
                period_prices_selected = holding_period_prices[valid_tickers_in_holding_period]
                period_returns = period_prices_selected.pct_change().fillna(0)
                # Assume equal weight for the period
                equal_weight_returns = period_returns.mean(axis=1)
                return equal_weight_returns
            else:
                # print(f"  -> No valid selected tickers found in price data for holding period starting {start_date.date()}.") # Moved print to streamlit logic
                return pd.Series(index=holding_period_prices.index) # Return empty series with correct index
        else:
            # print(f"  -> No data for holding period starting {start_date.date()}.") # Moved print to streamlit logic
            return pd.Series() # Return empty series


    def _concatenate_and_process_returns(self, all_period_returns, test_dates):
        """
        Concatenates all period returns, calculates cumulative returns, reindexes, and forward fills.

        Args:
            all_period_returns (list): A list of pandas Series, each containing returns for a holding period.
            test_dates (pd.DatetimeIndex): The full range of test period dates.
        """
        if all_period_returns:
            self.portfolio_returns = pd.concat(all_period_returns).sort_index()
            self.portfolio_returns = self.portfolio_returns[~self.portfolio_returns.index.duplicated(keep='first')]

            # 5. Calculate final cumulative returns
            self.cum_returns = (1 + self.portfolio_returns).cumprod()

            # Prepend a starting value of 1 at the very beginning of the test period
            actual_start_date = self.portfolio_returns.index[0] if not self.portfolio_returns.empty else test_dates[0]
            start_value_series = pd.Series([1.0], index=[actual_start_date - pd.Timedelta(days=1)])
            self.cum_returns = pd.concat([start_value_series, self.cum_returns])

            # Reindex to the full test_dates and forward fill
            self.cum_returns = self.cum_returns.reindex(test_dates)
            self.cum_returns = self.cum_returns.ffill()

            # print("\n--- Backtest Returns Processed ---") # Moved print to streamlit logic
        else:
            # print("\nBacktest could not be completed. No returns were generated.") # Moved print to streamlit logic
            self.portfolio_returns = pd.Series(index=test_dates if test_dates is not None and not test_dates.empty else None)
            self.cum_returns = pd.Series(index=test_dates if test_dates is not None and not test_dates.empty else None)



    def run_backtest(self):
        """
        Runs the point-in-time backtesting loop with quarterly rebalancing.
        """
        test_dates = self._get_test_period_dates()
        rebalance_dates = self._calculate_rebalance_dates(test_dates)

        if rebalance_dates.empty: # Check for empty DatetimeIndex
            # print("No rebalance dates available") # Moved print to streamlit logic
            # Ensure returns are initialized if no rebalance dates
            self.portfolio_returns = pd.Series(index=test_dates if test_dates is not None and not test_dates.empty else None)
            self.cum_returns = pd.Series(index=test_dates if test_dates is not None and not test_dates.empty else None)
            return

        # Loop through rebalance dates to simulate the strategy
        all_period_returns = []
        last_rebalance_date = None
        last_selected_tickers = []

        # print(f"Starting point-in-time backtest with {len(rebalance_dates)} rebalance periods...") # Moved print to streamlit logic

        for i, rebalance_date in enumerate(rebalance_dates):
            if last_rebalance_date is not None and last_selected_tickers:
                # Calculate returns for the previously selected portfolio during the last holding period
                holding_period_returns = self._calculate_holding_period_returns(last_rebalance_date, rebalance_date, last_selected_tickers)
                if not holding_period_returns.empty:
                    all_period_returns.append(holding_period_returns)


            # --- Point-in-Time Analysis for the current rebalance_date ---
            current_period_selected_tickers = self._perform_point_in_time_analysis(rebalance_date)

            last_selected_tickers = current_period_selected_tickers
            last_rebalance_date = rebalance_date

        # After the loop, process the returns from the last holding period
        if last_rebalance_date is not None and last_selected_tickers and test_dates is not None and not test_dates.empty:
            final_holding_period_returns = self._calculate_holding_period_returns(last_rebalance_date, test_dates[-1] + pd.Timedelta(days=1), last_selected_tickers) # Add a day to include the last day
            if not final_holding_period_returns.empty:
                all_period_returns.append(final_holding_period_returns)

        # Concatenate and process all collected returns
        self._concatenate_and_process_returns(all_period_returns, test_dates)


    def calculate_performance_metrics(self):
        """
        Calculates and stores key performance metrics.
        Assumes run_backtest has been called and self.portfolio_returns and self.cum_returns are populated.
        """
        print(f"Calculating metrics. Portfolio Returns empty: {self.portfolio_returns.empty if self.portfolio_returns is not None else True}")
        print(f"Calculating metrics. Cum Returns empty: {self.cum_returns.empty if self.cum_returns is not None else True}")
        print(f"Calculating metrics. Cum Returns length: {len(self.cum_returns) if self.cum_returns is not None else 0}")


        if self.portfolio_returns is None or self.portfolio_returns.empty or self.cum_returns is None or len(self.cum_returns) <= 1:
            print("\nNot enough data to calculate performance metrics.")
            self.total_return = np.nan
            self.annualized_return = np.nan
            self.annualized_vol = np.nan
            self.sharpe_ratio = np.nan
            return

        # Ensure metrics are calculated on the non-NaN parts of the cumulative returns series
        if not self.cum_returns.dropna().empty and len(self.cum_returns.dropna()) > 1:
            self.total_return = self.cum_returns.dropna().iloc[-1] - 1

            # Calculate duration based on the actual number of trading days in the portfolio_returns series
            duration_trading_days = len(self.portfolio_returns.dropna()) # Use dropna() here too
            annualization_factor = 252 / duration_trading_days if duration_trading_days > 0 else 0

            if annualization_factor > 0:
                 self.annualized_return = (1 + self.total_return)**annualization_factor - 1
                 self.annualized_vol = self.portfolio_returns.std() * np.sqrt(252) # Volatility is based on daily returns
                 # Check if annualized_vol is not zero before calculating Sharpe Ratio
                 self.sharpe_ratio = self.annualized_return / self.annualized_vol if self.annualized_vol != 0 and not pd.isna(self.annualized_vol) else 0.0 # Add isnan check

            else:
                 # Handle case with no trading days in returns
                 self.total_return = np.nan # Set total return to NaN too
                 self.annualized_return = np.nan
                 self.annualized_vol = np.nan
                 self.sharpe_ratio = np.nan
                 print("\nNot enough trading days in the backtest period to calculate annualized metrics.")

        else:
            print("\nNot enough data points to calculate performance metrics.")
            self.total_return = np.nan
            self.annualized_return = np.nan
            self.annualized_vol = np.nan
            self.sharpe_ratio = np.nan

    def set_starting_amount(self, starting_amount):
        self.starting_amount = starting_amount

    def display_performance_metrics(self, starting_amount=None):
        """
        Displays the calculated performance metrics, scaled by a starting amount.
        Assumes calculate_performance_metrics has been called.
        """
        # Use the instance's starting_amount if not provided in the method call
        display_amount = starting_amount if starting_amount is not None else self.starting_amount

        # print(f"\n--- Performance Metrics (Starting Amount: ${display_amount}) ---") # Moved print to streamlit logic
        # Recalculate metrics if they are not already calculated or are NaN
        if any(pd.isna([self.total_return, self.annualized_return, self.annualized_vol, self.sharpe_ratio])):
             self.calculate_performance_metrics()

        # Moved print statements to streamlit logic for displaying metrics

    def plot_cum_returns(self):
        if self.cum_returns is not None and not self.cum_returns.empty:
            plt.figure(figsize=(12, 6))
            # Scale the cumulative returns by the starting amount for plotting
            (self.cum_returns * self.starting_amount).plot()
            plt.title(f'Point-in-Time Strategy Cumulative Returns (Starting Amount: ${self.starting_amount})')
            plt.ylabel(f'Growth of ${self.starting_amount}')
            plt.grid()
            plt.show()
        else:
            # print("\nNo cumulative returns data generated by the Backtester to plot.") # Moved print to streamlit logic
            pass


# --- Helper functions for Streamlit UI interactions ---

@st.cache_data # Cache the data fetching and initial processing
def load_data(data_years, subset_tickers, debug_mode):
    """Loads S&P 500 tickers and historical price data."""
    end_date = datetime.today()
    start_date = end_date - timedelta(days=365 * data_years)

    market_caps_df = download_sp500_tickers(debug=debug_mode)
    if market_caps_df.empty:
        return None, None

    all_tickers = market_caps_df['Ticker'].tolist()
    tickers_to_download = all_tickers[:subset_tickers] # Use the subset size from parameters

    price_data = download_price_data(tickers_to_download, start_date, end_date)
    if price_data.empty:
        return None, None

    # Explicitly extract only 'Close' prices and create a new DataFrame with simple columns
    if isinstance(price_data.columns, pd.MultiIndex):
         # Select 'Close' for all tickers in subset_tickers
         price_data_close = price_data.loc[:, (tickers_to_download, 'Close')]
         # Drop the 'Price' level to have tickers as single-level columns
         price_data_close.columns = price_data_close.columns.droplevel('Price')
    else: # Handle case where price_data might not have a MultiIndex (e.g., single ticker download)
         # Assuming it's already the close series for a single ticker, convert to DataFrame
         price_data_close = pd.DataFrame(price_data)
         if tickers_to_download:
             price_data_close.columns = [tickers_to_download[0]] # Set the ticker as the column name
         else:
             return None, None # Should not happen if market_caps_df is not empty


    return price_data_close, tickers_to_download # Return the filtered tickers used


# --- Main Streamlit App Logic ---

# Set the title of the Streamlit application
st.title('Stock Backtesting Strategy')

# Create a section for parameter inputs
st.header('Backtesting Parameters')
with st.expander("Adjust Parameters"):
    st.write("Configure the parameters for the backtesting simulation.")

    # Add DEBUG checkbox
    debug_mode = st.checkbox('Debug Mode (Load S&P 500 tickers from storage)', value=True, help="If checked, attempt to load S&P 500 tickers from a local file cache first.")


    # Input widget for SPLIT_RATIO
    split_ratio_input = st.number_input(
        'Train/Test Split Ratio',
        min_value=0.1,
        max_value=0.9,
        value=0.7,
        step=0.05,
        help="Ratio of data to use for training (e.g., 0.7 for 70% train, 30% test)"
    )

    # Input widget for DATA_YEARS
    data_years_input = st.number_input(
        'Number of Years of Data',
        min_value=1,
        max_value=10,
        value=2,
        step=1,
        help="Number of years of historical data to download"
    )

    # Input widget for TOP_N_STOCKS
    top_n_stocks_input = st.number_input(
        'Top N Stocks to Select',
        min_value=1,
        max_value=50,
        value=5,
        step=1,
        help="Number of top-ranked stocks to include in the portfolio"
    )

    # Input widget for Subset Tickers size
    subset_tickers_input = st.number_input(
        'Number of S&P 500 Tickers to Consider',
        min_value=10,
        max_value=500, # Max should be number of tickers available
        value=50,
        step=10,
        help="Number of largest S&P 500 tickers to consider for the backtest."
    )


# Define the main logic to run when parameters change
if st.button('Run Backtest'):
    # Load data using the cached function
    with st.spinner(f"Loading data for {data_years_input} years..."):
        price_data_close, tickers_to_backtest = load_data(data_years_input, subset_tickers_input, debug_mode)


    if price_data_close is None or tickers_to_backtest is None or price_data_close.empty or not tickers_to_backtest:
        st.error("Failed to load necessary data. Please check your parameters and try again.")
        st.stop()

    # Instantiate the Backtester class
    backtester = Backtester(
        price_data_close=price_data_close,
        subset_tickers=tickers_to_backtest,
        split_ratio=split_ratio_input,
        top_n_stocks=top_n_stocks_input
    )

    # Run the backtest
    with st.spinner("Running backtest..."):
         backtester.run_backtest()


    if backtester.cum_returns is None or backtester.cum_returns.empty:
         st.warning("Backtest did not produce cumulative returns. Check data availability and parameters.")
         st.stop()

    # Calculate performance metrics
    backtester.calculate_performance_metrics()


    # Download SPY data for benchmark
    spy_ticker = '^GSPC'
    benchmark_cum_returns = pd.Series() # Initialize as empty
    benchmark_returns = pd.Series() # Initialize as empty

    try:
        with st.spinner("Downloading benchmark data..."):
            # Download benchmark data for the specific test period
            # Ensure benchmark data aligns with the portfolio's test period index
            test_period_start = backtester.cum_returns.index[0] if not backtester.cum_returns.empty else datetime.today() - timedelta(days=365 * data_years_input * (1 - split_ratio_input))
            test_period_end = backtester.cum_returns.index[-1] if not backtester.cum_returns.empty else datetime.today()


            benchmark_data = yf.download(spy_ticker, start=test_period_start, end=test_period_end, auto_adjust=True, progress=False)

        if not benchmark_data.empty:
             benchmark_returns = benchmark_data['Close'].pct_change().fillna(0)
             benchmark_cum_returns = (1 + benchmark_returns).cumprod()

            # Align the two time series to a common index, forward filling missing values
             common_index = backtester.cum_returns.index.union(benchmark_cum_returns.index)
             cum_returns_aligned = backtester.cum_returns.reindex(common_index).fillna(method='ffill')
             benchmark_cum_returns_aligned = benchmark_cum_returns.reindex(common_index).fillna(method='ffill')

        else:
            st.warning("Benchmark data download failed. Skipping benchmark in plots and metrics.")
            # If benchmark fails, ensure cum_returns_aligned still holds the portfolio data
            cum_returns_aligned = backtester.cum_returns.reindex(backtester.cum_returns.index)
            benchmark_cum_returns_aligned = pd.Series(index=backtester.cum_returns.index) # Empty series for benchmark


    except Exception as e:
         st.warning(f"Could not download benchmark data or align: {e}. Skipping benchmark.")
         # If benchmark fails, ensure cum_returns_aligned still holds the portfolio data
         cum_returns_aligned = backtester.cum_returns.reindex(backtester.cum_returns.index)
         benchmark_cum_returns_aligned = pd.Series(index=backtester.cum_returns.index) # Empty series for benchmark
         benchmark_returns = pd.Series(index=backtester.cum_returns.index) # Initialize benchmark_returns to empty Series in case of error


    # --- Results Visualization (Interactive Graph) ---
    st.header('Cumulative Returns')
    st.write("Visualize the cumulative returns of the selected portfolio vs. a benchmark.")

    if cum_returns_aligned is not None and not cum_returns_aligned.empty:
        # Create a DataFrame for plotting
        plot_df = pd.DataFrame({
            'Date': cum_returns_aligned.index,
            'My Portfolio': cum_returns_aligned.values
        })

        if benchmark_cum_returns_aligned is not None and not benchmark_cum_returns_aligned.empty:
             # Only add benchmark if it's not an empty series
             # Ensure benchmark_cum_returns_aligned is not all NaNs before adding
             if not benchmark_cum_returns_aligned.dropna().empty:
                plot_df['S&P 500'] = benchmark_cum_returns_aligned.values

        if not plot_df.empty:
             # Reshape the DataFrame for Plotly
             plot_df_melted = plot_df.melt(
                 'Date',
                 var_name='Strategy',
                 value_name='Cumulative Returns'
             )

             # Create the interactive Plotly line chart
             fig = px.line(
                 plot_df_melted,
                 x='Date',
                 y='Cumulative Returns',
                 color='Strategy',
                 title='Cumulative Return: My Portfolio vs S&P 500'
             )

             # Display the Plotly figure in Streamlit
             st.plotly_chart(fig, use_container_width=True)
        else:
             st.warning("No valid data points to plot.")

    else:
        st.warning("No cumulative returns data to plot.")

    # --- Performance Metrics ---
    st.header('Performance Metrics')
    st.write("Key performance indicators for the backtested strategy.")

    # Ensure we have enough data to calculate metrics
    if backtester.portfolio_returns is not None and not backtester.portfolio_returns.empty and backtester.cum_returns is not None and len(backtester.cum_returns) > 1:
        # --- Create Columns for Display ---
        col1, col2 = st.columns(2)

        # --- Portfolio Calculations ---
        with col1:
            st.subheader("My Portfolio")
            # Use the metrics calculated and stored in the backtester instance
            if not pd.isna(backtester.total_return):
                 st.metric(label="Total Return", value=f"{backtester.total_return:.2%}")
                 final_value = 10000 * (1 + backtester.total_return) # Display with a starting amount of $10000
                 st.metric(label="Final Portfolio Value ($10k start)", value=f"${final_value:.2f}")

            if not pd.isna(backtester.annualized_return):
                st.metric(label="Annualized Return", value=f"{backtester.annualized_return:.2%}")
            if not pd.isna(backtester.annualized_vol):
                st.metric(label="Annualized Volatility", value=f"{backtester.annualized_vol:.2%}")
            if not pd.isna(backtester.sharpe_ratio):
                st.metric(label="Sharpe Ratio", value=f"{backtester.sharpe_ratio:.2f}")
            # The warning is triggered if any of these are NaN. This is the intended behavior.
            # if any(pd.isna([backtester.total_return, backtester.annualized_return, backtester.annualized_vol, backtester.sharpe_ratio])):
            #      st.warning("Could not calculate all portfolio metrics.")


        # --- Benchmark Calculations ---
        with col2:
            st.subheader("S&P 500 (Benchmark)")
            # Check if benchmark data is valid before trying to calculate
            # Use .empty to check if the DataFrame/Series is empty
            if benchmark_returns is not None and not benchmark_returns.empty and not benchmark_returns.isnull().values.all() and benchmark_cum_returns is not None and not benchmark_cum_returns.empty and len(benchmark_cum_returns) > 1:
                try:
                    # Use .item() to ensure we have a single Python number
                    bench_total_return = (benchmark_cum_returns.iloc[-1] - 1).item()
                    # Calculate benchmark duration based on its own data length
                    bench_duration_trading_days = len(benchmark_returns)
                    bench_annualization_factor = 252 / bench_duration_trading_days if bench_duration_trading_days > 0 else 0

                    if bench_annualization_factor > 0:
                         bench_annualized_return = (1 + bench_total_return)**(bench_annualization_factor) - 1
                         bench_annualized_vol = benchmark_returns.std().item() * np.sqrt(252)

                         # Safely calculate Sharpe Ratio for the benchmark
                         bench_sharpe_ratio = bench_annualized_return / bench_annualized_vol if bench_annualized_vol > 0 else 0.0

                         st.metric(label="Total Return", value=f"{bench_total_return:.2%}")
                         st.metric(label="Annualized Return", value=f"{bench_annualized_return:.2%}")
                         st.metric(label="Annualized Volatility", value=f"{bench_annualized_vol:.2%}")
                         st.metric(label="Sharpe Ratio", value=f"{bench_sharpe_ratio:.2f}")
                    else:
                         st.warning("Not enough trading days in benchmark data for annualized metrics.")
                except Exception as e:
                    st.error(f"Error calculating benchmark metrics: {e}")
            else:
                st.warning("Not enough benchmark data to calculate metrics.")
    else:
        st.warning("No performance metrics to display. Insufficient data from backtest.")

## Run streamlit app

### Subtask:
Start the Streamlit application using `streamlit run app.py`.


**Reasoning**:
The Streamlit application code has been written to `app.py`. The next step is to start the Streamlit server to run the application.



In [None]:
# This script will kill the old processes and start a new one.
from pyngrok import ngrok
from google.colab import userdata

# Step 1: Kill any running Streamlit process
# This is the crucial step you were missing.
print("Attempting to kill previous Streamlit process...")
!kill $(ps -ef | grep "streamlit" | grep -v "grep" | awk '{print $2}')
print("Kill command sent.")

# Step 2: Disconnect all ngrok tunnels and kill the ngrok process
print("\nDisconnecting all ngrok tunnels...")
ngrok.kill()

# Step 3: Relaunch everything
print("Setting up new ngrok tunnel...")
ngrok_token = userdata.get('NGROK_API_KEY')
if ngrok_token:
    ngrok.set_auth_token(ngrok_token)
else:
    print("WARNING: NGROK_API_KEY not found in Colab Secrets.")

# Open the tunnel
public_url = ngrok.connect(8501)
print(f"New Public URL: {public_url}")

# Run the updated app.py in the background
print("Starting new Streamlit app...")
!streamlit run app.py &>/dev/null&
print("App is running in the background. Please use the new public URL.")