In [1]:
import requests
import pandas as pd
import numpy as np
import yfinance as yf
import tensorflow as tf
from bs4 import BeautifulSoup
import random
from sklearn.preprocessing import StandardScaler

In [2]:
def get_headers():
    return random.choice([
        {"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.122 Safari/537.36"},
        {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'},
        {'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36'}
    ])

# Stock class for stock data
class Stock:
    def __init__(self, ticker, sector):
        self.ticker = ticker
        self.sector = sector
        self.price = 0.0
        self.priceurl = f'https://finance.yahoo.com/quote/{self.ticker}'
        self.pricehistory = pd.DataFrame()
        self.data = {}
        self.dataurl = f"https://finance.yahoo.com/quote/{self.ticker}/key-statistics?p={self.ticker}"
        
        # Deep Learning Attributes
        self.technical_indicators = pd.DataFrame()
        self.today_technical_indicators = pd.DataFrame()
        self.labels = pd.DataFrame()
        self.prediction = 0.0
        
        # Metrics
        self.metrics = {}
        self.metric_aliases = {
            'Market Cap (intraday)': 'market_cap',
            'Beta (5Y Monthly)': 'beta',
            '52 Week High 3': '52_week_high',
            '52 Week Low 3': '52_week_low',
            '50-Day Moving Average 3': '50_day_ma',
            '200-Day Moving Average 3': '200_day_ma',
            'Avg Vol (3 month) 3': 'avg_vol_3m',
            'Avg Vol (10 day) 3': 'avg_vol_10d',
            'Shares Outstanding 5': 'shares_outstanding',
            'Float 8': 'float',
            '% Held by Insiders 1': 'held_by_insiders',
            '% Held by Institutions 1': 'held_by_institutions',
            'Short Ratio (Jan 30, 2023) 4': 'short_ratio',
            'Payout Ratio 4': 'payout_ratio',
            'Profit Margin': 'profit_margin',
            'Operating Margin (ttm)': 'operating_margin',
            'Return on Assets (ttm)': 'return_on_assets',
            'Return on Equity (ttm)': 'return_on_equity',
            'Revenue (ttm)': 'revenue',
            'Revenue Per Share (ttm)': 'revenue_per_share',
            'Gross Profit (ttm)': 'gross_profit',
            'EBITDA ': 'ebitda',
            'Net Income Avi to Common (ttm)': 'net_income',
            'Diluted EPS (ttm)': 'eps',
            'Total Cash (mrq)': 'total_cash',
            'Total Cash Per Share (mrq)': 'cash_per_share',
            'Total Debt (mrq)': 'total_debt',
            'Total Debt/Equity (mrq)': 'debt_to_equity',
            'Current Ratio (mrq)': 'current_ratio',
            'Book Value Per Share (mrq)': 'book_value_per_share',
            'Operating Cash Flow (ttm)': 'operating_cash_flow',
            'Levered Free Cash Flow (ttm)': 'levered_free_cash_flow'
        }
        
    def scrape_data(self):
        #get fundamental info of the firm, saved in stock.data as dictionary
        #yf.Ticker(self.ticker).info.keys()
        page = requests.get(self.dataurl, headers = get_headers())
        soup = BeautifulSoup(page.content, 'html.parser')
        data = {}
        sections = soup.find_all('section', {'data-test': 'qsp-statistics'})
        for section in sections:
            rows = section.find_all('tr')
            for row in rows:
                cols = row.find_all('td')
                if len(cols) == 2:
                    metric = cols[0].text.strip()
                    if metric in self.metric_aliases:
                        data[self.metric_aliases[metric]] = cols[1].text.strip()

        self.data = data
    
    def get_stock_price(self):
        # grab latest price/close price
        try:
            response = requests.get(self.priceurl, headers = get_headers())
            soup = BeautifulSoup(response.content, 'html.parser')
            data = soup.find('fin-streamer', {'data-symbol': self.ticker})
            price = float(data['value'])
            self.price = price

        except:
            print(f'Price not available for {self.ticker}')
            self.price = 0.0
            
    def get_historical(self):
        # grab price history saved as dataframe
        stock = yf.Ticker(self.ticker)
        history = stock.history(start = '2010-01-01', end='2023-02-28')
        self.pricehistory = history
        
        
    def add_technical_indicators(self):
        # get historical stock prices
        prices = self.pricehistory
        if len(prices) < 20:
            return
        
        # calculate 20-day moving average
        prices['MA20'] = prices['Close'].rolling(window=20).mean()
        
        # calculate 50-day moving average
        prices['MA50'] = prices['Close'].rolling(window=50).mean()
        
        # calculate relative strength index (RSI), window = 14 days
        delta = prices['Close'].diff()
        gain = delta.where(delta > 0, 0)
        loss = -delta.where(delta < 0, 0)
        avg_gain = gain.rolling(window=14).mean()
        avg_loss = loss.rolling(window=14).mean()
        rs = avg_gain / avg_loss
        prices['RSI'] = 100 - (100 / (1 + rs))
        
        # calculate moving average convergence divergence (MACD)
        exp1 = prices['Close'].ewm(span=12, adjust=False).mean()
        exp2 = prices['Close'].ewm(span=26, adjust=False).mean()
        macd = exp1 - exp2
        signal = macd.ewm(span=9, adjust=False).mean()
        prices['MACD'] = macd - signal
        
        # calculate Bollinger Bands, window = 20 days
        prices['20MA'] = prices['Close'].rolling(window=20).mean()
        prices['20STD'] = prices['Close'].rolling(window=20).std()
        prices['UpperBand'] = prices['20MA'] + (prices['20STD'] * 2)
        prices['LowerBand'] = prices['20MA'] - (prices['20STD'] * 2)

        # Features for deep learning model, train_set exclude last 10 days technical indicator
        train_data_aux = prices[['Close', 'MA20', 'MA50', 'RSI', 'MACD', 'UpperBand', 'LowerBand']].dropna()
        self.technical_indicators = train_data_aux.iloc[:-10, :].drop('Close', axis=1) 

        # Set label as profit/loss of 10 day future price from actual price, use last 10 days to label
        labels_aux = (train_data_aux['Close'].shift(-10) > train_data_aux['Close']).astype(int)
        self.labels =  labels_aux[:-10]

        # Today features for prediction
        self.today_technical_indicators = prices[['MA20', 'MA50', 'RSI', 'MACD', 'UpperBand', 'LowerBand']].iloc[-1,:] 

        prices = prices.reset_index()
        # store technical indicators in stock data dictionary
        self.data.update(prices[['Date', 'MA20', 'MA50', 'RSI', 'MACD', 'UpperBand', 'LowerBand']].to_dict('list'))   
                

In [3]:
# Stocks Screener Class, store stocks in stock class as a list
class StockScreener:
    def __init__(self, stocks):
        self.stocks = stocks
        self.scaler = StandardScaler()
        self.models = {}

    # Add data to stocks, can be used for filter
    def add_data(self):
        i = 1
        for stock in self.stocks:
            stock.scrape_data()
            stock.get_stock_price()
            print(i)
            i += 1

    # Select stocks list that pass all filters
    def apply_filters(self, filters):
        filtered_stocks = []
        for stock in self.stocks:
            passed_all_filters = True
            for filter_func in filters:
                if not filter_func(stock):
                    passed_all_filters = False
                    break
            if passed_all_filters:
                filtered_stocks.append(stock)
        return filtered_stocks
    
    #Add deep learning related data to selected stocks (passed all filter or specified), pass a list of stock class
    def prepare_new_stocks(self, new_stocks):
        for stock in new_stocks:
            stock.get_historical()
            stock.add_technical_indicators()
            
    # Train deep learning models on selected stocks, supervised learning, use technical_indicators to predict >0 profit
    def train_models(self, new_stocks):
        for stock in new_stocks:
            train_data = stock.technical_indicators
            train_labels = stock.labels

            # Normalize the data
            train_data = self.scaler.fit_transform(train_data)
            train_labels = np.array(train_labels)

            # Create and train model, add stock's model to screener models dictionary
            model = create_model(train_data)
            model.fit(train_data, train_labels, epochs=10)
            self.models[stock.ticker] = model
    
    # Predict whether new stocks will pass filters
    def predict_stocks(self, new_stocks):
        # Add technical indicators to new stocks, generate models
        self.prepare_new_stocks(new_stocks)
        self.train_models(new_stocks)
        
        # Make predictions for each stock using its corresponding model
        predicted_stocks = []
        for stock in new_stocks:
            if stock.ticker in self.models:
                #grab stock's model
                model = self.models[stock.ticker]
                # Reshape as there is only one sample,use today's technical indicator to predict future 10 days profit
                new_features_aux = np.array(stock.today_technical_indicators).reshape(1, -1)
                new_stock_data = self.scaler.fit_transform(new_features_aux)
                prediction = model.predict(new_stock_data)
                stock.prediction = prediction
                if prediction > 0.6: #0.5
                    predicted_stocks.append(stock)

        return predicted_stocks

In [4]:
# Simple Dense model 
def create_model(train_data):
    model = tf.keras.models.Sequential([
        tf.keras.layers.Dense(64, input_shape=(train_data.shape[1],), activation='relu'),
        tf.keras.layers.Dense(32, activation='relu'),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model

In [5]:
def filter_sector(stock, sector):
    return stock.sector == sector

def filter_price(stock, min_price, max_price):
    return min_price <= stock.price <= max_price

def filter_technical_indicator(stock, indicator_name, operator, value):
    if indicator_name not in stock.today_technical_indicators:
        return False

    # Obtain the value of the technical indicator
    indicator_value = stock.today_technical_indicators[indicator_name]
    
    # Check if the value is 'price':
    if value == 'price':
        value = float(stock.price)
    else:
        value = float(value)
    
    # Compare according to operator
    if operator == '>':
        return float(indicator_value) > value
    elif operator == '>=':
        return float(indicator_value) >= value
    elif operator == '<':
        return float(indicator_value) < value
    elif operator == '<=':
        return float(indicator_value) <= value
    elif operator == '==':
        return float(indicator_value) == value
    else:
        return False
       
def filter_metric(stock, metric, operator, value):
    if metric not in stock.data:
        print('hola')
        return False

    # Convert value to same units as metric, if necessary
    if 'B' in stock.data[metric]:
        stock.data[metric] = stock.data[metric].replace('B', '')
        value = float(value) / 1e9
    elif 'M' in stock.data[metric]:
        stock.data[metric] = stock.data[metric].replace('M', '')
        value = float(value) / 1e6
    elif '%' in stock.data[metric]:
        stock.data[metric] = stock.data[metric].replace('%', '')
        value = float(value)
    else:
        value = float(value)

    # Check condition according to operator
    if operator == '>':
        return float(stock.data[metric]) > value
    elif operator == '>=':
        return float(stock.data[metric]) >= value
    elif operator == '<':
        return float(stock.data[metric]) < value
    elif operator == '<=':
        return float(stock.data[metric]) <= value
    elif operator == '==':
        return float(stock.data[metric]) == value
    else:
        return False

In [6]:
# Get sp500 ticker and sector
url = 'https://en.wikipedia.org/wiki/List_of_S%26P_500_companies'
response = requests.get(url)
soup = BeautifulSoup(response.content, 'html.parser')

table = soup.find('table', {'class': 'wikitable sortable'})
rows = table.find_all('tr')[1:]  # skip the header row

sp500 = []

for row in rows:
    cells = row.find_all('td')
    ticker = cells[0].text.strip()
    company = cells[1].text.strip()
    sector = cells[3].text.strip()
    sp500.append({'ticker': ticker, 'company': company, 'sector': sector})

In [7]:
# make dict of stock to stock class
def stockuniverse(stocklist):
    stockuniverse = [Stock(stock['ticker'], stock['sector']) for stock in stocklist]
    return stockuniverse

In [8]:
# Get sp500 tickers and sectors, use first 10 stocks
sp500_stocks = stockuniverse(sp500[:10])
# Screener
screener = StockScreener(sp500_stocks)
# Add Data
screener.add_data()

# Run screener for all sp500 tickers
filters = [#lambda stock: filter_sector(stock, 'Industrial Conglomerates'),
           #lambda stock: filter_price(stock, 60, 200),
           #lambda stock: filter_metric(stock, 'profit_margin', '>', 15),
           #lambda stock: filter_technical_indicator(stock, 'UpperBand', '>', 'price'),
           #lambda stock: filter_technical_indicator(stock, 'LowerBand', '<', 'price'),
]

# Apply Filters
filtered_stocks = screener.apply_filters(filters)

# Make predictions, grab/analyze past historical price for filtered_stocks
predicted_stocks = screener.predict_stocks(filtered_stocks)

final_list = []
for predicted_stock in predicted_stocks:
    final_list.append(predicted_stock.ticker)
final_list

1
2
3
4
5
6
7
8
9
10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10


Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


['AOS', 'ABT', 'ACN', 'ATVI', 'ADBE', 'AAP']

In [9]:
#Original list: sp500 -> sp500_stocks -> screener.stocks (store stock class as a list)
#Individual stock: screener.stocks[0], screener.stocks[1]
#Individual stock original stats: screener.stocks[0].ticker, screener.stocks[0].sector
#Individual stock later added for filter: screener.stocks[0].data, screener.stocks[0].price, screener.stocks[0].metrics
#screener.stocks[0].technical_indicators

In [10]:
#improvements: use Eikon or other source to grab stocks' info quickly