In [102]:
# import necessary libraries
import pandas as pd
import yfinance as yf
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from datetime import datetime, timedelta

In [103]:
# Global cache to store downloaded stock data
stock_data_cache = {}

def sp500_stocks():
    try:
      if 'sp500' in stock_data_cache:  # Check cache first
          return stock_data_cache['sp500']

      sp500_table = pd.read_html('https://en.wikipedia.org/wiki/List_of_S%26P_500_companies')[0]
      sp500_tickers = sp500_table['Symbol'].tolist()
      
      # Initialize volatilities dictionary and populate it in one go
      volatilities = {}
      sp500_data = yf.download(sp500_tickers, start="2020-01-01", end="2022-01-01")['Adj Close']
      
      for ticker in sp500_tickers:
          try:
              beta = yf.Ticker(ticker).info.get('beta', 1)
          except Exception:
              beta = 1
          volatilities[ticker] = beta
      
      sorted_volatilities = {k: v for k, v in sorted(volatilities.items(), key=lambda item: abs(1 - item[1]))}
      vol_ticker = [ticker for ticker in reversed(sorted_volatilities.keys())][:5]
      
      stock_data_cache['sp500'] = vol_ticker  # Cache the result
      return vol_ticker
    
    except pd.errors.PandasError as pd_err:
        print(f"Pandas Error: {pd_err}")
    except Exception as e:
        print(f"An unknown error occurred: {e}")

    return []

In [104]:
# 2. Valuate Stock using DCF
# Pre-fetch treasury yield and market data
treasury_ticker = yf.Ticker('^TNX')
market_ticker = yf.Ticker('^GSPC')
risk_free_rate = treasury_ticker.history(period='max')['Close'].iloc[-1] / 100
market_history = market_ticker.history(period='max')['Close']


def fetch_stock_data(ticker):
    try:
        if ticker in stock_data_cache:
            return stock_data_cache[ticker]

        stock = yf.Ticker(ticker)
        info = stock.info
        financials = stock.financials
        income_statement = stock.income_stmt
        balance_sheet = stock.balance_sheet
        cash_flow = stock.cash_flow
        dividends = stock.dividends

        stock_data_cache[ticker] = (info, financials, income_statement, balance_sheet, cash_flow, dividends)
        
        return info, financials, income_statement, balance_sheet, cash_flow, dividends
    
    except yf.YFinanceError as yf_err:
        print(f"Yahoo Finance Error for {ticker}: {yf_err}")
    except Exception as e:
        print(f"An unknown error occurred for {ticker}: {e}")
        
    return None, None, None, None, None, None


# Function to calculate discount rate


def discount_rate(stock_data):
    info, financials, _, balance_sheet, _, _ = stock_data
    E = info['marketCap']
    D = balance_sheet.loc['Total Debt'].iloc[-1]
    V = E + D
    Tc = 0.21

    Interest_Expense = financials.loc['Interest Expense'].iloc[0]
    Rd = Interest_Expense / D * (1 - Tc)

    Re = cost_of_equity(stock_data)

    return (E/V) * Re + (D/V) * Rd * (1-Tc)

# Function to calculate cost of equity


def cost_of_equity(stock_data):
    info, _, _, _, _, _ = stock_data
    Beta = info['beta']

    last_date = market_history.index[-1]
    start_date = last_date - pd.DateOffset(years=5)
    filtered_data = market_history[start_date:last_date]
    daily_change = filtered_data.pct_change().dropna()
    annualized_return = ((1 + daily_change.mean()) ** 252 - 1)

    return risk_free_rate + Beta * (annualized_return - risk_free_rate)

# Function to get Free Cash Flow


def get_FCF(stock_data):
    _, financials, _, _, cash_flow, _ = stock_data
    CFO = cash_flow.loc['Operating Cash Flow'].iloc[0]
    IE = financials.loc['Interest Expense'].iloc[0]
    CAPEX = cash_flow.loc['Capital Expenditure'].iloc[0]
    return CFO + IE * (1 - 0.21) - CAPEX

# Function to calculate Terminal Value


def Terminal_Value(stock_data):
    info, _, income_statement, balance_sheet, _, dividends = stock_data
    FCFF = get_FCF(stock_data)
    dividend_paid = dividends.iloc[-1] if not dividends.empty else 0
    net_income = income_statement.loc['Net Income'].iloc[0]
    retention_rate = 1 - dividend_paid / net_income

    debt = balance_sheet.loc['Total Debt'].iloc[0]
    equity = info['marketCap']
    ROIC = (net_income - dividend_paid) / (debt + equity)

    g = retention_rate * ROIC
    r = discount_rate(stock_data)

    return FCFF * (1 + g) / (r - g)

# Main DCF function
def DCF(ticker, forecast_period=5):
    stock_data = fetch_stock_data(ticker)
    WACC = discount_rate(stock_data)
    FCFFn = get_FCF(stock_data)
    TV = Terminal_Value(stock_data)
    PV_TV = TV / (1 + WACC) ** forecast_period

    PV_FCFF = sum(FCFFn / (1 + WACC) **
                  i for i in range(1, forecast_period + 1))

    EV = PV_TV + PV_FCFF
    info = stock_data[0]
    Implied_Share_Price = EV / info['sharesOutstanding']
    return Implied_Share_Price


In [105]:
# 3. Portfolio Optimization
# Tangency Portfolio
def tangency_portfolio(data):
   # calculates mean/expected return of each asset in portfolio 
    mu = data.mean() 

    # inverse of covariance matrix of the asset returns
    sigma = np.linalg.inv(data.cov()) 

    # one vector with number equal to columns
    one_vector = np.ones(len(data.columns)) 

    # dot product of inv.cov matrix and excess return over dot product of sigma, mu, and one
    return sigma @ mu / (one_vector @ sigma @ mu) 

def allocator(tickers):
    try:
        if 'portfolio_data' in stock_data_cache:
            data = stock_data_cache['portfolio_data']
        else:
            data = yf.download(tickers, start="2015-01-01", end=datetime.now().strftime('%Y-%m-%d'))['Adj Close']
            stock_data_cache['portfolio_data'] = data

        weights = tangency_portfolio(data)
        
        return weights.tolist()
        
    except yf.YFinanceError as yf_err:
        print(f"Yahoo Finance Error for portfolio: {yf_err}")
    except ValueError as val_err:
        print(f"Value error in portfolio calculation: {val_err}")
    except Exception as e:
        print(f"An unknown error occurred in portfolio: {e}")
        
    return []



In [106]:
# 4. Valuator
# Function to valuate trade 
def valuator(stock: str, end_date: str = None):
    # Use current date if end_date is not provided
    if end_date is None:
        end_date = datetime.now().strftime('%Y-%m-%d')

    # if stock is not a valid ticker, return False
    if not yf.Ticker(stock).info:
        print('Invalid stock ticker. Try Again!')

    # Calculate valuated price using DCF method
    valuated_price = DCF(stock)
    if valuated_price is None:
        print('DCF calculation cannot be done.')

    # Download stock's adjusted close price
    try:
        current_price = yf.download(
            stock, start='2015-01-01', end=end_date)['Adj Close'].iloc[-1]
    except Exception as e:
        print(f"Failed to download stock data: {e}")
        return None  # Return None or some default value to handle this in the calling function


    # Truncate prices to the second decimal place
    truncated_current_price = np.trunc(current_price * 100) / 100
    truncated_valuated_price = np.trunc(valuated_price * 100) / 100

    # return a dictionary with the current price, valuated price, and valuation
    if truncated_current_price > truncated_valuated_price:
        return {"current_price": truncated_current_price, "valuated_price": truncated_valuated_price, "valuation": "overvalued"}
    elif truncated_current_price < truncated_valuated_price:
        return {"current_price": truncated_current_price, "valuated_price": truncated_valuated_price, "valuation": "undervalued"}
    else:
        return {"current_price": truncated_current_price, "valuated_price": truncated_valuated_price, "valuation": "fairly valued"}

In [111]:
# Trade Executor
class Trader:
    def __init__(self, initial_capital=10000):
        self.portfolio = {}
        self.portfolio_value = 0
        self.initial_capital = initial_capital

    def enter_trade(self, tickers=None, end_date=None):
        # Use current date if end_date is not provided
        if end_date is None:
            end_date = datetime.now().strftime('%Y-%m-%d')

        if tickers is None:
            tickers = sp500_stocks()

        # if length of tickers is greater than 5, cut it down to the first five
        if len(tickers) > 5:
            tickers = tickers[:5]

        # Get list of weights
        weights = allocator(tickers)
        
        # Get valuation of each stock
        valuation = [valuator(ticker, end_date) for ticker in tickers]

        # set portfolio value to initial capital
        self.portfolio_value = self.initial_capital
        
        # Execute initial trades based on valuation

        for i in range(len(tickers)):
            value = valuation[i]['valuation']
            current_price = valuation[i]['current_price']

            num_shares = (weights[i] * self.initial_capital) / current_price

            if value == 'overvalued':
                self.portfolio[tickers[i]] = {'shares': -num_shares, 'total_value': -num_shares * current_price}
            elif value == 'undervalued':
                self.portfolio[tickers[i]] = {'shares': num_shares, 'total_value': num_shares * current_price}
            else:
                continue
            
        self.portfolio = pd.DataFrame(self.portfolio)
        return self.portfolio

    def exit_trade(self, tickers, end_date=None):
        # Use current date if end_date is not provided
        if end_date is None:
            end_date = datetime.now().strftime('%Y-%m-%d')

        # Get valuation of each stock
        valuation = [valuator(ticker, end_date) for ticker in tickers]

        # Get list of weights
        weights = allocator(tickers)

        # Check for exit condition
        for i in range(len(tickers)):
            value = valuation[i]['valuation']
            current_price = valuation[i]['current_price']

            # Calculate 0.5 standard deviation criteria for exiting trade
            if self.portfolio.get(tickers[i], 0) != 0:
                half_std = 0.5 * np.std(valuation[i]['current_price']) # Replace with appropriate std calculation if needed

                # Check if the exit condition is met and execute trade
                if value == 'overvalued' and valuation[i]['valuated_price'] <= current_price - half_std:
                    self.portfolio[tickers[i]]['total_value'] += current_price * weights[i]
                    self.portfolio[tickers[i]]['shares'] = self.portfolio[tickers[i]]['total_value'] / current_price
                    break
                elif value == 'undervalued' and valuation[i]['valuated_price'] >= current_price + half_std:
                    self.portfolio[tickers[i]]['total_value'] -= current_price * weights[i]
                    self.portfolio[tickers[i]]['shares'] = self.portfolio[tickers[i]]['total_value'] / current_price
                    break
                
        self.portfolio = pd.DataFrame(self.portfolio)
        return self.portfolio
    
    def calculate_portfolio_value(self, tickers, valuation):
        for i in range(len(tickers)):
            value = valuation[i]['valuation']
            if value == 'overvalued':
                self.portfolio_value -= self.portfolio[tickers[i]]['total_value']
            elif value == 'undervalued':
                self.portfolio_value += self.portfolio[tickers[i]]['total_value']

        return self.portfolio_value