In [None]:
from preprocess import preprocess_data
from features import *

df = preprocess_data("data/5-min-all.csv")#.collect()

# 1. Feature Engineering (Baseline Regression/Classification Models)

## $\text{SAM}_{12}$, $\text{SAM}_{26}$

In [2]:
df = add_sma(df, column="CLOSE", window=12)
df = add_sma(df, column="CLOSE", window=26)

df = df.drop_nulls(subset=["SMA_12", "SMA_26"])

## $\text{EMA}_{12}$, $\text{EMA}_{26}$

In [3]:
df = add_ema(df, column="CLOSE", window=12)
df = add_ema(df, column="CLOSE", window=26)

## $\text{MACD}$ (MACD + Signal + Hist)

In [4]:
df = add_macd(df)

## $\text{Bollinger Bands}$

In [5]:
df = add_bollinger_bands(df, price_col="CLOSE", window=20, k=2)

## $\text{RSI}$

In [6]:
df = add_rsi(df, price_col="CLOSE", window=14)

df.write_parquet("data/5min-features-clean.parquet")

<div style="text-align: center;">
  <span style="color: red; font-size: 48px; font-weight: bold;">
    IMPORTANT: RESTART KERNEL HERE
  </span>
</div>

# 3. Scale / Normalize (Per-Ticker Z-Score)

## Drop `null` columns as well as other columns we no longer need

In [2]:
import polars as pl

df_model = pl.read_parquet("data/5min-features-clean.parquet")

feature_cols = [
    "CLOSE",
    "VOL",
    "SMA_12", "SMA_26",
    "EMA_12", "EMA_26",
    "MACD", "MACD_SIGNAL", "MACD_HIST",
    "BB_UPPER_20", "BB_LOWER_20", "RSI_14",
]

meta_cols = ["EXCHANGE", "TICKER", "TIMESTAMP"]
target_col = "UP_NEXT"

cols_to_keep = meta_cols + feature_cols + [target_col]
df_model = df_model.select([c for c in df_model.columns if c in cols_to_keep])

df_model = df_model.drop_nulls(subset=feature_cols)

## Normalize features per ticker to make high-value stocks (e.g., AAPL) comparable with lower-value stocks (e.g., $3 stocks)

In [3]:
feature_cols = [
    "CLOSE",
    "VOL",
    "SMA_12", "SMA_26",
    "EMA_12", "EMA_26",
    "MACD", "MACD_SIGNAL", "MACD_HIST",
    "BB_UPPER_20", "BB_LOWER_20", "RSI_14",
]

for col in feature_cols:
    df_model = df_model.with_columns(
        (
            (pl.col(col) - pl.col(col).mean().over("TICKER"))
            / pl.col(col).std().over("TICKER")
        ).alias(f"{col}_Z")
    )

<div style="text-align: center;">
  <span style="color: red; font-size: 48px; font-weight: bold;">
    IMPORTANT: RESTART KERNEL HERE
  </span>
</div>

## Z-Score Clipping

In [None]:
import os

z_cols = [f"{c}_Z" for c in feature_cols]
df_model = df_model.with_columns([
    pl.col(c).clip(lower_bound=-5.0, upper_bound=5.0).alias(c) for c in z_cols
])

df_model.write_parquet("data/5min-features-model.parquet")
old_file = "data/5min-features-clean.parquet"
os.remove(old_file)

# Tests

<div style="text-align: center;">
  <span style="color: red; font-size: 48px; font-weight: bold;">
    IMPORTANT: RESTART KERNEL HERE
  </span>
</div>

In [None]:
import polars as pl

df_model = pl.read_parquet("data/5min-features-model.parquet")

df_model.select(pl.col("UP_NEXT").null_count())

In [None]:
if isinstance(df_model, pl.LazyFrame):
    df_collected = df_model.collect()
else:
    df_collected = df_model

print("=" * 80)
print("DATAFRAME OVERVIEW")
print("=" * 80)
print(f"\nShape: {df_collected.shape[0]:,} rows Ã— {df_collected.shape[1]} columns")
print(f"\nMemory usage: {df_collected.estimated_size('mb'):.2f} MB")
print(f"\nColumns: {list(df_collected.columns)}")

print("\n" + "=" * 80)
print("DATA TYPES")
print("=" * 80)
print(df_collected.dtypes)

print("\n" + "=" * 80)
print("FIRST FEW ROWS")
print("=" * 80)
print(df_collected.head(10))

print("\n" + "=" * 80)
print("LAST FEW ROWS")
print("=" * 80)
print(df_collected.tail(10))

print("\n" + "=" * 80)
print("MISSING VALUES")
print("=" * 80)
missing = df_collected.null_count()
print(missing)
print(f"\nTotal missing values: {missing.sum_horizontal().item():,}")
print(f"Percentage of missing values: {(missing.sum_horizontal().item() / (df_collected.shape[0] * df_collected.shape[1]) * 100):.2f}%")

print("\n" + "=" * 80)
print("DUPLICATE ROWS")
print("=" * 80)
duplicates = df_collected.is_duplicated().sum()
print(f"Number of duplicate rows: {duplicates:,}")
print(f"Percentage of duplicates: {(duplicates / df_collected.shape[0] * 100):.2f}%")

print("\n" + "=" * 80)
print("STATISTICAL SUMMARY (NUMERICAL COLUMNS)")
print("=" * 80)
numerical_cols = [col for col, dtype in zip(df_collected.columns, df_collected.dtypes) 
                  if dtype in [pl.Int64, pl.Float64]]
print(df_collected.select(numerical_cols).describe())

print("\n" + "=" * 80)
print("CATEGORICAL COLUMNS ANALYSIS")
print("=" * 80)
categorical_cols = [col for col, dtype in zip(df_collected.columns, df_collected.dtypes) 
                    if dtype == pl.String]
for col in categorical_cols:
    print(f"\n{col}:")
    value_counts = df_collected[col].value_counts().sort("count", descending=True)
    print(f"  Unique values: {df_collected[col].n_unique()}")
    print(f"  Top 10 values:")
    print(value_counts.head(10))

print("\n" + "=" * 80)
print("DATE/TIME ANALYSIS")
print("=" * 80)
if 'DATE' in df_collected.columns:
    date_min = df_collected['DATE'].min()
    date_max = df_collected['DATE'].max()
    print(f"DATE range: {date_min} to {date_max}")
    print(f"Unique dates: {df_collected['DATE'].n_unique()}")
    
if 'TIME' in df_collected.columns:
    time_min = df_collected['TIME'].min()
    time_max = df_collected['TIME'].max()
    print(f"TIME range: {time_min} to {time_max}")
    print(f"Unique times: {df_collected['TIME'].n_unique()}")

print("\n" + "=" * 80)
print("OUTLIER DETECTION (IQR METHOD)")
print("=" * 80)
for col in numerical_cols:
    if col in ['DATE', 'TIME', 'PER', 'OPENINT']:  # Skip ID-like columns
        continue
    q1 = df_collected[col].quantile(0.25)
    q3 = df_collected[col].quantile(0.75)
    iqr = q3 - q1
    lower_bound = q1 - 1.5 * iqr
    upper_bound = q3 + 1.5 * iqr
    outliers = df_collected.filter(
        (pl.col(col) < lower_bound) | (pl.col(col) > upper_bound)
    ).shape[0]
    print(f"{col}: {outliers:,} outliers ({outliers/df_collected.shape[0]*100:.2f}%)")

print("\n" + "=" * 80)
print("ZERO VALUES CHECK")
print("=" * 80)
for col in numerical_cols:
    zeros = (df_collected[col] == 0).sum()
    if zeros > 0:
        print(f"{col}: {zeros:,} zero values ({zeros/df_collected.shape[0]*100:.2f}%)")

print("\n" + "=" * 80)
print("NEGATIVE VALUES CHECK")
print("=" * 80)
for col in numerical_cols:
    if col in ['DATE', 'TIME', 'PER', 'OPENINT']:
        continue
    negatives = (df_collected[col] < 0).sum()
    if negatives > 0:
        print(f"{col}: {negatives:,} negative values ({negatives/df_collected.shape[0]*100:.2f}%)")

print("\n" + "=" * 80)
print("CORRELATION MATRIX (NUMERICAL FEATURES)")
print("=" * 80)
price_cols = [col for col in numerical_cols if col in ['OPEN', 'HIGH', 'LOW', 'CLOSE', 'VOL']]
if len(price_cols) > 1:
    # Compute correlation matrix using Polars native functions
    corr_data = []
    for col1 in price_cols:
        row = []
        for col2 in price_cols:
            if col1 == col2:
                row.append(1.0)
            else:
                corr_val = df_collected.select(
                    pl.corr(col1, col2)
                ).item()
                row.append(round(corr_val, 3))
        corr_data.append(row)
    
    # Print correlation matrix
    print(f"\n{'':>10}", end="")
    for col in price_cols:
        print(f"{col:>12}", end="")
    print()
    for i, col in enumerate(price_cols):
        print(f"{col:>10}", end="")
        for val in corr_data[i]:
            print(f"{val:>12.3f}", end="")
        print()

print("\n" + "=" * 80)
print("DATA QUALITY CHECKS")
print("=" * 80)
# Check for logical inconsistencies
if all(col in df_collected.columns for col in ['HIGH', 'LOW', 'OPEN', 'CLOSE']):
    invalid_high_low = df_collected.filter(pl.col('HIGH') < pl.col('LOW')).shape[0]
    print(f"Rows where HIGH < LOW: {invalid_high_low:,}")
    
    invalid_open = df_collected.filter(
        (pl.col('OPEN') > pl.col('HIGH')) | (pl.col('OPEN') < pl.col('LOW'))
    ).shape[0]
    print(f"Rows where OPEN outside [LOW, HIGH]: {invalid_open:,}")
    
    invalid_close = df_collected.filter(
        (pl.col('CLOSE') > pl.col('HIGH')) | (pl.col('CLOSE') < pl.col('LOW'))
    ).shape[0]
    print(f"Rows where CLOSE outside [LOW, HIGH]: {invalid_close:,}")

print("\n" + "=" * 80)
print("SAMPLE STATISTICS BY CATEGORY")
print("=" * 80)
if 'TICKER' in df_collected.columns:
    print("\nNumber of unique tickers:", df_collected['TICKER'].n_unique())
    print("\nRows per ticker (top 10):")
    ticker_counts = df_collected.group_by('TICKER').agg(pl.len().alias('count')).sort('count', descending=True)
    print(ticker_counts.head(10))

if 'EXCHANGE' in df_collected.columns:
    print("\nNumber of unique exchanges:", df_collected['EXCHANGE'].n_unique())
    print("\nRows per exchange:")
    exchange_counts = df_collected.group_by('EXCHANGE').agg(pl.len().alias('count')).sort('count', descending=True)
    print(exchange_counts)
