# Creating an LSTM Model for Buy/Sell signal prediction on stock price data
## Contents
1. Download stock price data for all S&P 500 stocks
2. Clean up data
3. Preprocess data for LSTM Model
4. Train Model
5. Test Model
6. Evaluate Model based on backtest with real price data

In [None]:
import numpy as np
import torch
import torch.nn as nn
import datetime as dt
import plotly.graph_objects as go
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, TensorDataset

## Download stock price data for all S&P 500 stocks

We download the ticker data for about 30000 ticker symbols from yahoo finance.
For this we use a csv file containing valid tickers with financial data from https://github.com/ahnazary/Finance/blob/master/finance/src/database/valid_tickers.csv.

We use these tickers as a base for our training and testing data.

In [None]:
import pandas as pd
import dask.dataframe as dd
import yfinance as yf
import logging
from dask.delayed import delayed
from dask.base import compute


def download_data():
    logging.basicConfig(
        level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
    )

    # download data for a single ticker symbol
    @delayed
    def download_ticker_data(ticker):
        try:
            df = yf.download(ticker, start="2015-01-01", end="2024-01-01", progress=False)
            if df is None or df.empty:
                logging.warning(f"No data found for {ticker}")
                return None
            logging.info(f"Downloaded data for {ticker}")
            df = df.reset_index()
            df["Date"] = pd.to_datetime(df["Date"])
            df = df.set_index("Date")
            df["Symbol"] = ticker
            return df
        except Exception as e:
            return None

    # download data for ticker symbols in batches to avoid hitting thread limits
    def run_batches(tickers, batch_size = 500):
        all_batches = []
        for i in range(0, len(tickers), batch_size):
            batch = tickers[i : i + batch_size]
            delayed_data = [download_ticker_data(t) for t in batch]
            logging.info(f"Number of tickers in queue: {len(delayed_data)}")
            results = compute(*delayed_data, scheduler = "threads")
            valid_results = [
                dd.from_pandas(r, npartitions=1)
                for r in results
                if r is not None and isinstance(r, dd.DataFrame)
            ]
            valid_results = [
                dd.from_pandas(r, npartitions=1)
                for r in valid_results
                if r.npartitions > 0
            ]
            if valid_results:
                batch_df = dd.concat(valid_results)
                all_batches.append(batch_df)
            logging.info(f"Batch {i // batch_size + 1} completed.")
        if all_batches == []:
            logging.error("No valid data found in any batch.")
            return None
        return dd.concat(all_batches)

    # Read ticker data from CSV
    filepath = "../data/ticker_Symbols_yfinance.csv"
    tickers = dd.read_csv(
        filepath,
        dtype={
            "market_cap": "float64",
            "total_assets": "float64",
            "total_revenue": "float64",
        },
    )
    tickers = tickers["ticker"].compute().tolist()
    tickers = list(set(tickers))
    logging.info(f"Number of tickers: {len(tickers)}")


    # Check if tickers list is not empty
    if not tickers:
        logging.error("Ticker list is empty. Please check the input file.")
    else:
        # Download data for each ticker in parallel using delayed
        stock_data = run_batches(tickers, batch_size=500)

        if stock_data is not None and stock_data.npartitions > 0:
            stock_data = stock_data.sort_values(["Symbol", "Date"])
            logging.info("All tickers downloaded.")
            logging.info(f"stock_data.npartitions: {stock_data.npartitions}")
            
            stock_data = stock_data.sort_values(["ticker", "Date"])
            stock_data.to_csv("../data/stock_data.csv", index = False, single_file=True)
            logging.info("Data saved to CSV file.")
            return stock_data
        else:
            logging.error("Dask DataFrame is empty or not properly formed.")

In [42]:
filepath = "../data/sp500_stocks.csv"

# check if file is already available
try:
    stock_data = dd.read_csv(filepath)
    stock_data["Date"] = stock_data["Date"].map_partitions(dd.to_datetime, errors='coerce')
    stock_data.set_index("Date", inplace=True)
    stock_data.sort_values(by=["Symbol", "Date"], inplace=True)
    stock_data = stock_data.compute()
    print(stock_data.head())
    logging.info("Data already available, skipping download.")
except FileNotFoundError:
    stock_data = download_data()
    if stock_data is not None:
        print(stock_data.head())
    logging.info("Data not available, please download it first.")

2025-05-04 18:35:16,170 - INFO - Data already available, skipping download.


        Date Symbol  Adj Close      Close       High        Low       Open  \
0 2018-01-02      A  64.625580  67.599998  67.889999  67.339996  67.419998   
1 2018-01-03      A  66.269882  69.320000  69.489998  67.599998  67.620003   
2 2018-01-04      A  65.772766  68.800003  69.820000  68.779999  69.540001   
3 2018-01-05      A  66.824364  69.900002  70.099998  68.730003  68.730003   
4 2018-01-08      A  66.967758  70.050003  70.330002  69.550003  69.730003   

      Volume  
0  1047800.0  
1  1698900.0  
2  2230700.0  
3  1632500.0  
4  1613400.0  


In [None]:
# add technical indicators to each stock symbol
# for added indicators, see ta.add_all_ta_features


2025-05-04 18:28:19,802 - ERROR - No valid delayed results to process.
