In [3]:
################################################################################################
####        TITLE: TPOT (HARD)                                                              ####
####        DESCRIPTION: FINANCIAL SENTIMENT ANALYSIS                                       ####
####        AUTHOR: BRADLEY SCOTT                                                           ####
####        UMD ID: 119 775 028                                                             ####
####        DATE: 5NOV2025                                                                  ####
####        REFERENCES USED (see paper for full details):                                   ####
####            ChatGPT 5                                                                   ####
####            FNSPID: A Comprehensive Financial News Dataset in Time Series               ####
####        PYTHON VERSION: 3.11.14                                                         ####    
################################################################################################
'''
[BS11052025] fp_610_000001
[BS11052025] import all necessary modules
'''
import pandas as pd
import os
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from tqdm import tqdm
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
import os
from pathlib import Path
import yfinance as yf
from datetime import datetime
import time
import math
from sklearn.linear_model import LinearRegression
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.model_selection import TimeSeriesSplit
from tpot import TPOTRegressor
from sklearn.metrics import make_scorer, mean_squared_error
from dask.distributed import Client, LocalCluster



In [4]:
'''
[BS11102025] fp_610_000005 
[BS11102025] set file paths (local + Google Drive)
'''
# data directory
BASE = Path("/workspace/data") 

# --- Local file paths ---
file_all = BASE / "All_external.csv"  # optional
file_nq = BASE / "nasdaq_exteral_data.csv"
file_daily_news = BASE / "daily_news_sentiment.parquet"
file_val_tick = BASE / "valid_tickers.csv"

# --- Google Drive files (public links) ---
# To load directly from Google Drive, replace '/view' with '/uc?export=download&id='
def gdrive_parquet_url(file_id: str) -> str:
    return f"https://drive.google.com/uc?export=download&id={file_id}"

file_prices_url = gdrive_parquet_url("1x4ayM-VwgKBUtaLixmo870-q3ddwLnPE")
file_model_url = gdrive_parquet_url("1PmZUpnlb1QBFr6-1s0bkk6wdt4uFMCIO")

In [None]:
'''
[BS11052025] fp_610_000010
[BS11052025] Get the first chunk of the nasdaq_exteral_data.csv file while testing code
    NB: all_external.csv does not have summaries and would need web scraping so not using that
        data at this time
    NB: This was fore testing purposes so commented out
'''
#chunksize = 200_000 

#reader = pd.read_csv(file_nq, chunksize=chunksize, encoding_errors="ignore")

#df_chunk = next(reader)

'\n[BS11052025] fp_610_000010\n[BS11052025] Get the first chunk of the nasdaq_exteral_data.csv file while testing code\n    NB: all_external.csv does not have summaries and would need web scraping so not using that\n        data at this time\n    NB: This was fore testing purposes so commented out\n'

In [None]:
'''
[BS11052025] fp_610_000010b
[BS11052025] Exploring data. Commented out to keep code clean
'''
# Display info and first few rows
#print("Shape:", df_chunk.shape)
#print("\nColumns:\n", df_chunk.columns.tolist())

# Peek at a few rows
#df_chunk.head(3)

# Show which of those columns exist in your DataFrame
#summary_cols = ['Lsa_summary', 'Luhn_summary', 'Textrank_summary', 'Lexrank_summary']
#df_chunk[summary_cols].info()


'\n[BS11052025] fp_610_000010b\n[BS11052025] Exploring data. Commented out to keep code clean\n'

In [5]:
'''
[BS11052025] fp_610_000015
[BS11052025] Build out functions to use VADER for sentiment analysis
    NB: Will use textrank_summary if available, otherwise will use title
'''
tqdm.pandas()

analyzer = SentimentIntensityAnalyzer()

def pick_text(row):
    # prefer Textrank_summary; fallback to title; finally to Article
    for col in ["Textrank_summary", "Article_title", "Article"]:
        if col in row and pd.notna(row[col]) and str(row[col]).strip():
            return str(row[col])
    return ""

def vader_score(text: str) -> float:
    if not text:
        return 0.0
    return analyzer.polarity_scores(text)["compound"]

In [6]:
'''
[BS11052025] fp_610_000020
[BS11052025] Do the sentiment analysis and build out daily_news_sentiment.parquet
    NB: This took 50 minutes to run on my computer but it only needs to be ran once. 
'''
chunksize = 200_000 

# Only create the file if it doesn't already exist so we're not creating it more than once
if not file_daily_news.exists():
    print("üî® daily_news_sentiment.parquet does NOT exist ‚Äî creating it...")

    summary_cols = ["Textrank_summary"]   # suggest simplifying
    base_cols = ["Date","Article_title","Stock_symbol","Article"]
    usecols = list(dict.fromkeys(base_cols + summary_cols))

    writer = None

    for i, chunk in enumerate(pd.read_csv(
            file_nq, chunksize=chunksize, usecols=lambda c: True,
            encoding_errors="ignore"
        )):

        cols_present = [c for c in usecols if c in chunk.columns]
        df = chunk[cols_present].copy()

        # Clean date
        date_col = "Date"
        df[date_col] = pd.to_datetime(df[date_col], errors="coerce").dt.date
        df = df.dropna(subset=[date_col])

        # Expand tickers
        df["Stock_symbol"] = df["Stock_symbol"].astype(str).str.replace(" ", "")
        df = df[df["Stock_symbol"].str.len() > 0]
        df = df.assign(Stock_symbol=df["Stock_symbol"].str.split(",")).explode("Stock_symbol")
        df["Stock_symbol"] = df["Stock_symbol"].str.upper().str.replace(r"[^A-Z\.]", "", regex=True)
        df = df[df["Stock_symbol"].str.len() > 0]

        # Sentiment
        df["_text"] = df.apply(pick_text, axis=1)
        df["_sent"] = df["_text"].map(vader_score).astype("float32")

        # Daily aggregation
        daily = (
            df.groupby(["Stock_symbol", date_col], as_index=False)
              .agg(avg_sentiment=("_sent","mean"),
                   pos_count=("_sent", lambda s: np.sum(s > 0.05)),
                   neg_count=("_sent", lambda s: np.sum(s < -0.05)),
                   news_count=("_text","count"))
        )
        daily = daily.rename(columns={date_col: "date", "Stock_symbol":"ticker"})

        # Append to parquet
        table = pa.Table.from_pandas(daily, preserve_index=False)
        if writer is None:
            writer = pq.ParquetWriter(file_daily_news, table.schema)
        writer.write_table(table)

        print(f"Processed chunk {i}: wrote {len(daily)} daily rows")

    if writer is not None:
        writer.close()

    print("‚úÖ File created:", file_daily_news)

else:
    print("‚úÖ daily_news_sentiment.parquet already exists ‚Äî skipping creation")

‚úÖ daily_news_sentiment.parquet already exists ‚Äî skipping creation


In [None]:
'''
[BS11052025] fp_610_000020b
[BS11052025] Checking the file to see if there is any issues
'''
#df = pd.read_parquet(file_daily_news)
#df.head(20)
#len(df)
#df['ticker'].nunique(), df['date'].nunique()
#pd.set_option('display.max_rows', None)
#df['ticker'].value_counts()
#df.describe()

'\n[BS11052025] fp_610_000020b\n[BS11052025] Checking the file to see if there is any issues\n'

In [7]:
'''
[BS11072025] fp_610_000025
[BS11052025] Load the sentiment analysis for any tickers with data for more than 200 days
'''
# Load daily sentiment (created earlier)
news = pd.read_parquet(file_daily_news)

# Format date
news['date'] = pd.to_datetime(news['date']).dt.date

# ---- ‚úÖ Filter to past 10 years ----
news = news[(news['date'] >= datetime(2014,1,1).date())]

# Clean ticker column: drop NaN and "NAN", "", etc.
news = news[ news['ticker'].notna() ]               # removes actual NaN
news = news[ news['ticker'].str.upper() != "NAN" ]  # removes literal "NAN"
news = news[ news['ticker'].str.strip() != "" ]     # removes blanks

# Remove invalid tickers (must be A‚ÄìZ or dot)
is_valid = news['ticker'].str.match(r'^[A-Z\.]+$')
news = news[is_valid.fillna(False)]

# Start with tickers that have enough coverage for ML dev
min_days = 200
ticker_counts = news['ticker'].value_counts()
tickers = ticker_counts.loc[ticker_counts >= min_days].index.tolist()

len(tickers), tickers[:10]
len(tickers), tickers[:10]

(3578,
 ['GILD', 'BABA', 'MRK', 'CMCSA', 'KO', 'FDX', 'SLB', 'OXY', 'QQQ', 'NEE'])

In [None]:
'''
[BS11072025] fp_610_000030
[BS11072025] filter the sentiment analysis down to just the tickers selected in the last step
    and then find the earliest date and most recent date
    NB: For testing purposes
'''
#news_filtered = news[news['ticker'].isin(tickers)].copy()

#news_filtered['date'].min(), news_filtered['date'].max()


'\n[BS11052025] fp_610_000030\n[BS11052025] filter the sentiment analysis down to just the tickers selected in the last step\n    and then find the earliest date and most recent date\n    NB: For testing purposes\n'

In [8]:
'''
[BS11072025] fp_610_000030
[BS11072025] yFinance does not contain info for most delisted stocks. 
             Of the stocks we have enough data from in the past 10 years, we need to remove
             all the delisted stocks. This will ping yFinance to check if the ticker exists
             and create a list of delisted stocks that we need to remove from our data.
             This took 18 minutes to run so save off the file if it doesn't already exist, otherwise skip.
'''
GLOBAL_START = "2014-01-01"
GLOBAL_END   = "2024-01-09"

def has_prices_in_window(ticker, start=GLOBAL_START, end=GLOBAL_END, max_retries=2):
    """Return True if YF returns any rows for (ticker, start..end)."""
    for attempt in range(1, max_retries + 1):
        try:
            df = yf.download(
                ticker,
                start=start,
                end=end,
                auto_adjust=True,
                threads=False,
                progress=False,
            )
            return not df.empty
        except Exception:
            time.sleep(2 * attempt)   # small exponential backoff
    return False


# -------------------------------------------------------------------
# ‚úÖ IF THE FILE EXISTS ‚Üí LOAD AND SKIP PROCESSING
# -------------------------------------------------------------------
if os.path.exists(file_val_tick):
    print(f"‚úÖ Found existing valid ticker file: {file_val_tick}")
    valid_tickers = pd.read_csv(file_val_tick)["ticker"].tolist()
    print(f"Loaded {len(valid_tickers)} valid tickers.")
    
else:
    # -------------------------------------------------------------------
    # ‚ùó FILE DOES NOT EXIST ‚Üí RUN FULL PROBING + SAVE RESULTS
    # -------------------------------------------------------------------
    print("üîç No valid ticker file found. Running yfinance validation step...")
    
    valid_tickers = []
    invalid_tickers = []

    for t in tickers:      # 'tickers' is your filtered high-coverage list
        ok = has_prices_in_window(t)
        (valid_tickers if ok else invalid_tickers).append(t)

    print(f"‚úÖ Valid: {len(valid_tickers)}  ‚ùå Invalid: {len(invalid_tickers)}")

    # Save the valid ones for next time
    pd.Series(valid_tickers, name="ticker").to_csv(file_val_tick, index=False)
    print(f"üíæ Saved valid tickers to {file_val_tick}")


‚úÖ Found existing valid ticker file: /workspace/data/valid_tickers.csv
Loaded 2644 valid tickers.


In [9]:
'''
[BS11072025] fp_610_000035
[BS11072025] Load valid tickers and narrow sentiment down to just valid tickers
'''
# Load validated tickers  
valid_tickers = pd.read_csv(file_val_tick)["ticker"].tolist()

# Keep only sentiment rows with validated tickers
news = news[news['ticker'].isin(valid_tickers)]

In [10]:
'''
[BS11072025] fp_610_000040
[BS11072025] Get the earliest and latest sentiment date per ticker
    NB: I pad the days by 7 weeks on both ends for computing next-day returns
'''
win = (
    news.groupby("ticker")["date"]
        .agg(min_date='min', max_date='max')
        .reset_index()
)

pad = pd.Timedelta(days=7)
win["start"] = pd.to_datetime(win["min_date"]) - pad
win["end"]   = pd.to_datetime(win["max_date"]) + pad

In [10]:
#### THIS IS AS FAR AS I MADE IT TO CHANGES SO THAT IT WAS IN THE FORMAT REQUIRED ####
#### I WAS USING ANACONDA/VSCode BEFORE THE PR WAS DUE ####

'''
[BS11072025] fp_610_000045
[BS11072025] If the file_prices data has not been saved off
             1) Pull the price data per valid ticker for the times done in step fp_610_000040
             2) put it all together and save it off to file_prices
'''
# ----------------------------------------------------------
# ‚úÖ If file exists ‚Üí load & skip downloading
# ----------------------------------------------------------
if os.path.exists(file_prices):
    print(f"‚úÖ Found existing prices file: {file_prices}")
    prices = pd.read_parquet(file_prices)
    print(f"Loaded prices: {prices['ticker'].nunique()} tickers, {len(prices):,} rows")
    
else:
    # ------------------------------------------------------
    # ‚ùó File does NOT exist ‚Üí run full download
    # ------------------------------------------------------
    print("üîç No saved prices found ‚Äî downloading from yfinance...")

    BATCH_SIZE = 25
    MAX_RETRIES = 4
    BASE_SLEEP = 8

    def chunked(lst, n):
        for i in range(0, len(lst), n):
            yield lst[i:i+n]

    frames = []
    failed_soft = []
    failed_hard = []
    downloaded_ok = set()

    for batch_idx, batch in enumerate(chunked(valid_tickers, BATCH_SIZE), start=1):

        # Per-batch date ranges
        sub = win[win["ticker"].isin(batch)]
        start = sub["start"].min().strftime("%Y-%m-%d")
        end   = sub["end"].max().strftime("%Y-%m-%d")

        tries = 0
        while True:
            try:
                print(f"[Batch {batch_idx}] Downloading {len(batch)} tickers...")
                px = yf.download(
                    batch,
                    start=start,
                    end=end,
                    auto_adjust=True,
                    group_by="ticker",
                    progress=False,
                    threads=False
                )

                # Determine which tickers we got data for
                if isinstance(px.columns, pd.MultiIndex):
                    got = set(px.columns.get_level_values(0))
                else:
                    got = set(batch) if not px.empty else set()

                # For each ticker successfully returned, flatten & save
                if not px.empty and isinstance(px.columns, pd.MultiIndex):
                    for t in got:
                        p = px[t].reset_index().rename(columns={"Date": "date"})
                        p["ticker"] = t
                        p["date"] = pd.to_datetime(p["date"]).dt.date
                        frames.append(p)
                        downloaded_ok.add(t)

                # Missing tickers
                missing = [t for t in batch if t not in got]
                for t in missing:
                    failed_hard.append((t, "no_price_data_in_window"))

                break  # batch done

            except Exception as e:
                tries += 1
                if tries <= MAX_RETRIES:
                    sleep_s = BASE_SLEEP * (2 ** (tries - 1))
                    print(f"   Error: {e} ‚Äî retrying in {sleep_s:.0f}s...")
                    time.sleep(sleep_s)
                else:
                    failed_soft.append((batch, f"batch_error:{repr(e)}"))
                    break

    print("‚úÖ Finished downloading price data")

    # ------------------------------------------------------
    # ‚úÖ Assemble `prices` DataFrame
    # ------------------------------------------------------
    prices = (
        pd.concat(frames, ignore_index=True)
          .rename(columns={
              "Open":"open","High":"high","Low":"low",
              "Close":"close","Adj Close":"adj_close","Volume":"volume"
          })
    )
    if "adj_close" not in prices.columns:
        prices["adj_close"] = prices["close"]

    # ------------------------------------------------------
    # ‚úÖ Save for future use
    # ------------------------------------------------------
    prices.to_parquet(file_prices, index=False)
    print(f"üíæ Saved prices to {file_prices}")

‚úÖ Found existing prices file: C:\Users\Brad\Documents\MSML610 project\Project data\ticker_prices.parquet
Loaded prices: 2644 tickers, 6,651,728 rows


In [11]:
'''
[BS11072025] fp_610_000050
[BS11072025] Trim the price data to each ticker's window
'''
# merge windows
temp = win[["ticker", "min_date", "max_date"]].copy()
temp["min_date"] = pd.to_datetime(temp["min_date"]).dt.date
temp["max_date"] = pd.to_datetime(temp["max_date"]).dt.date

prices = prices.merge(temp, on="ticker", how="inner")
prices = prices[
    (prices["date"] >= prices["min_date"]) &
    (prices["date"] <= prices["max_date"])
]
prices = prices.drop(columns=["min_date","max_date"])

In [12]:
'''
[BS11072025] fp_610_000055
[BS11072025] Merge sentiment data and price data, fill 0 for non news days 
'''
df = prices.merge(
    news,
    on=["ticker", "date"],
    how="left"
).sort_values(["ticker", "date"])

# fill missing sentiment days with 0 indicating it is a news-free day
sent_cols = ["avg_sentiment","pos_count","neg_count","news_count"]
for c in sent_cols:
    df[c] = df[c].fillna(0.0)

In [13]:
'''
[BS11072025] fp_610_000060
[BS11072025] Feature enggineer additional flags
'''
# ‚úÖ If the model file already exists ‚Üí load it and skip all heavy feature engineering
if file_model.exists():
    print("‚úÖ Found existing engineered model file. Loading...")
    df_model = pd.read_parquet(file_model)
    print(f"Loaded df_model with shape: {df_model.shape}")
else:
    print("üîç No model file found. Running full feature engineering...")

    # 1-day ahead return
    df["ret_1d"] = (
        df.groupby("ticker")["adj_close"]
        .pct_change(fill_method=None)
        .shift(-1)
    )

    # lag features (prevent leakage)
    df["sent_lag1"] = df.groupby("ticker")["avg_sentiment"].shift(1)

    # example rolling window
    df["sent_roll3"] = (
        df.groupby("ticker")["avg_sentiment"]
        .apply(lambda s: s.shift(1).rolling(3, min_periods=1).mean())
        .values
    )

    # Drop rows without a target
    df_model = df.dropna(subset=["ret_1d"]).reset_index(drop=True)

    # lagged returns
    df = df.sort_values(["ticker", "date"]).copy()

    df["ret_1d_past"] = (
        df.groupby("ticker")["adj_close"].pct_change(fill_method=None).shift(1)
    )
    df["ret_5d_past"] = (
        df.groupby("ticker")["adj_close"].pct_change(5, fill_method=None).shift(1)
    )

    df["ret_10d_past"] = (
        df.groupby("ticker")["adj_close"].pct_change(10, fill_method=None).shift(1)
    )

    # rolling volatility 10 and 20 day
    df["vol_roll10"] = (
        df.groupby("ticker")["adj_close"]
        .apply(lambda s: s.pct_change(fill_method=None).shift(1)
                                .rolling(10, min_periods=5).std())
        .values
    )

    df["vol_roll20"] = (
        df.groupby("ticker")["adj_close"]
        .apply(lambda s: s.pct_change(fill_method=None).shift(1)
                                .rolling(20, min_periods=5).std())
        .values
    )

    # rolling sentiment mean per 7 day
    df["sent_roll7"] = (
        df.groupby("ticker")["avg_sentiment"]
        .apply(lambda s: s.shift(1).rolling(7, min_periods=3).mean())
        .values
    )

    # rolling sentiment count per 7 day
    df["news_count_roll7"] = (
        df.groupby("ticker")["news_count"]
        .apply(lambda s: s.shift(1).rolling(7, min_periods=3).sum())
        .values
    )

    # volume based features
    df["vol_lag1"] = df.groupby("ticker")["volume"].shift(1)

    df["vol_roll10"] = (
        df.groupby("ticker")["volume"]
        .apply(lambda s: s.shift(1).rolling(10, min_periods=5).mean())
        .values
    )

    df["vol_surge"] = df["volume"] / df["vol_roll10"]

    # slop of recent price movement
    def rolling_slope(x):
        if len(x) < 5:
            return np.nan
        y = x.values.reshape(-1,1)
        X = np.arange(len(x)).reshape(-1,1)
        model = LinearRegression().fit(X, y)
        return model.coef_[0][0]

    df["trend_slope_5"] = (
        df.groupby("ticker")["adj_close"]
        .apply(lambda s: s.shift(1).rolling(5).apply(rolling_slope, raw=False))
        .values
    )

    # binary sentiment flags
    df["sent_pos"] = (df["avg_sentiment"] > 0).astype(int)
    df["sent_neg"] = (df["avg_sentiment"] < 0).astype(int)

    # overnight return
    df["overnight_ret"] = df.groupby("ticker")["open"].shift(-1) / df["close"] - 1

    # Final model df
    df_model = df.dropna(subset=["ret_1d"]).reset_index(drop=True)

    # Save the final engineered model dataset
    df_model.to_parquet(file_model, index=False)
    print(f"üíæ Saved engineered model to {file_model}")

‚úÖ Found existing engineered model file. Loading...
Loaded df_model with shape: (6150445, 28)


In [14]:
'''
[BS11072025] fp_610_000065
[BS11072025] load df_model
'''
df_model = pd.read_parquet(file_model)
df_model["date"] = pd.to_datetime(df_model["date"]).dt.date  # ensure date type

In [15]:
'''
[BS11072025] fp_610_000070
[BS11072025] define target and feature columns and narrow down to only rows with all features present
'''
# Target
y = df_model["ret_1d"].astype("float32")

# Candidate features: include what we engineered; take intersection with existing columns
candidate_feats = [
    # sentiment
    "avg_sentiment","pos_count","neg_count","news_count",
    "sent_lag1","sent_roll3","sent_roll7","news_count_roll7",
    "sent_pos","sent_neg",
    # price/volume context
    "adj_close","open","high","low","close","volume",
    "ret_1d_past","ret_5d_past","ret_10d_past",
    "vol_roll10","vol_roll20","vol_lag1","vol_surge",
    "trend_slope_5","overnight_ret",
]
feat_cols = [c for c in candidate_feats if c in df_model.columns]

# Keep only rows with all features present
X = df_model[["ticker","date"] + feat_cols].copy()
mask = X[feat_cols].notna().all(axis=1) & y.notna()
X, y = X.loc[mask].reset_index(drop=True), y.loc[mask].reset_index(drop=True)

print(f"Using {len(feat_cols)} features:", feat_cols)

Using 25 features: ['avg_sentiment', 'pos_count', 'neg_count', 'news_count', 'sent_lag1', 'sent_roll3', 'sent_roll7', 'news_count_roll7', 'sent_pos', 'sent_neg', 'adj_close', 'open', 'high', 'low', 'close', 'volume', 'ret_1d_past', 'ret_5d_past', 'ret_10d_past', 'vol_roll10', 'vol_roll20', 'vol_lag1', 'vol_surge', 'trend_slope_5', 'overnight_ret']


In [16]:
'''
[BS11072025] fp_610_000075
[BS11072025] Do train/test split
'''
# Data goes up to 2024-01-09 so use approximately last 12 months of data as test
cutoff = pd.to_datetime("2023-01-01").date()
train_idx = X["date"] <  cutoff
test_idx  = X["date"] >= cutoff

X_train, X_test = X.loc[train_idx], X.loc[test_idx]
y_train, y_test = y.loc[train_idx], y.loc[test_idx]

print("Train span:", X_train["date"].min(), "‚Üí", X_train["date"].max(), f"({len(X_train):,} rows)")
print("Test  span:", X_test["date"].min(), "‚Üí", X_test["date"].max(),  f"({len(X_test):,} rows)")

Train span: 2014-01-08 ‚Üí 2022-12-30 (5,465,298 rows)
Test  span: 2023-01-03 ‚Üí 2024-01-08 (482,433 rows)


In [17]:
'''
[BS11072025] fp_610_000080
[BS11072025] Preprocess data by scaling numerics and using one-hot encoding for ticker
'''
num_features = feat_cols
cat_features = ["ticker"]

preprocess = ColumnTransformer(
    transformers=[
        ("num", StandardScaler(), num_features),
        ("cat", OneHotEncoder(handle_unknown="ignore", sparse_output=True), cat_features),
    ],
    remainder="drop",
)

In [18]:
'''
[BS11072025] fp_610_000085
[BS11072025] Limit model 
            NB: I've reduced this to the top 200 tickers for testing purposes
                since I will likely need to offload this computation to a different computer
                with higher RAM
'''
top_n = 200
top_tickers = (X_train.groupby("ticker", as_index=False)
                        .size()
                        .sort_values("size", ascending=False)
                        .head(top_n)["ticker"])

keep = X_train["ticker"].isin(top_tickers)
X_train, y_train = X_train.loc[keep].reset_index(drop=True), y_train.loc[keep].reset_index(drop=True)
print("Train rows after top-N tickers:", len(X_train))

Train rows after top-N tickers: 554695


In [19]:
'''
[BS11072025] fp_610_000090
[BS11072025] Limit model 
            NB: I've reduced the number of features and downcast them for testing purposes
            since I will likely need to offload this computation to a different computer
            with higher RAM
'''
# Start lean: drop the most memory/CPU-heavy features
drop_feats = {"trend_slope_5", "vol_roll20", "vol_roll10", "vol_surge"}
num_features = [c for c in feat_cols if c not in drop_feats]
cat_features = ["ticker"]

for c in num_features:
    X_train[c] = X_train[c].astype("float32", copy=False)

In [20]:
'''
[BS11072025] fp_610_000095
[BS11072025] Remove any infinite of null values
'''
# Clean X_train: remove inf and NaN
X_train = X_train.replace([np.inf, -np.inf], np.nan)

mask = X_train.notna().all(axis=1) & y_train.notna()
X_train = X_train.loc[mask].reset_index(drop=True)
y_train = y_train.loc[mask].reset_index(drop=True)

print("‚úÖ Cleaned X_train shape:", X_train.shape)

‚úÖ Cleaned X_train shape: (554695, 27)


In [21]:

# 1) Tiny, thread-only cluster to avoid Windows nanny/process issues
cluster = LocalCluster(
    n_workers=1,             # keep it simple; scale later if needed
    threads_per_worker=1,    # predictable memory use
    processes=False,         # IMPORTANT: no separate processes ‚áí no nanny
    memory_limit="3GB",      # leave headroom for the OS
    dashboard_address=None,  # don‚Äôt open a port
)
client = Client(cluster)

# (Optional) quick sanity check
print(client)
print(client.scheduler_info()["workers"].keys())

# 2) Time-aware CV as before
tscv = TimeSeriesSplit(n_splits=5)

# 3) TPOT configured for Dask client you created
tpot = TPOTRegressor(
    generations=8,
    population_size=20,
    cv=tscv,
    scorers=["neg_root_mean_squared_error"],  # TPOT 0.12+ expects scorer names
    bigger_is_better=True,                    # maximize neg RMSE = minimize RMSE
    max_eval_time_mins=5,
    n_jobs=1,                  # let Dask control parallelism (we‚Äôre 1 worker anyway)
    processes=False,           # bool, not int
    verbose=2,
    client=client,             # <-- give TPOT the client so it won't create its own
    scatter=True,
    memory_limit="3GB",        # harmless duplicate; TPOT uses this when it creates clusters
)

pipe = Pipeline([("prep", preprocess), ("tpot", tpot)])
pipe.fit(X_train, y_train)

<Client: 'inproc://192.168.68.68/10716/1' processes=1 threads=1, memory=2.79 GiB>
dict_keys(['inproc://192.168.68.68/10716/4'])


Generation:  12%|‚ñà‚ñé        | 1/8 [1:00:04<7:00:32, 3604.70s/it]

Exception: No individuals could be evaluated in the initial population as the max_eval_mins time limit was reached before any individuals could be evaluated.