Exploratory Data Analysis

In [None]:
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import numpy as np
import scipy.stats as stats
import matplotlib.pyplot as plt
import statsmodels.api as sm
from pathlib import Path

# ---------- helper used by the loader ----------
import numpy as np
import pandas as pd

def prepare_intraday_data(df: pd.DataFrame, tz: str = "America/New_York") -> pd.DataFrame:
    """
    Standardize raw Alpaca 1-min bars so later EDA/backtests see the same shape.

    Input df must at least have: ['timestamp','close'] (plus open/high/low/volume/vwap if available).

    Steps
    -----
    1) Parse timestamps and convert to NYSE timezone.
    2) Keep only Regular Trading Hours (09:30–16:00).
    3) Sort chronologically.
    4) Add day label and compute intraday returns:
       - 'return'     : simple pct change
       - 'log_return' : log return

    Note: We *don’t* compute overnight returns here. That’s done later
    in `clean_and_enrich`, which also clips extreme intraday returns.
    """
    df = df.copy()

    # 1) robust tz handling (works whether timestamp is naive, UTC, or already tz-aware)
    ts = pd.to_datetime(df["timestamp"], errors="coerce")
    if ts.dt.tz is None:
        ts = ts.dt.tz_localize("UTC").dt.tz_convert(tz)
    else:
        ts = ts.dt.tz_convert(tz)
    df["timestamp"] = ts

    # 2) RTH filter
    df = df.set_index("timestamp").between_time("09:30", "16:00").reset_index()

    # 3) sort
    df = df.sort_values("timestamp").reset_index(drop=True)

    # 4) labels + returns
    df["date"] = df["timestamp"].dt.date
    df["return"] = df["close"].pct_change()
    df["log_return"] = np.log(df["close"] / df["close"].shift(1))

    return df


# -----------------------
# 1. Load multiple years/symbols
# -----------------------
symbols = ["SPY", "QQQ"]   # can extend this list
years = [2020, 2021, 2022, 2023]

data_dir = Path("../data/raw")

all_dfs = {}

for symbol in symbols:
    dfs = []
    for year in years:
        file = data_dir / f"{symbol}_1min_{year}.parquet"
        if file.exists():
            df_year = pd.read_parquet(file).reset_index()
            df_year = prepare_intraday_data(df_year)
            dfs.append(df_year)
    if dfs:
        df_symbol = pd.concat(dfs, ignore_index=True)
        all_dfs[symbol] = df_symbol
        print(f"Loaded {symbol}: {len(df_symbol)} rows across {len(dfs)} years")


In [None]:
# -----------------------
# Helper: clean + add overnight returns
# -----------------------
def clean_and_enrich(df, clip=0.05):
    """
    Cleans data by clipping extreme log returns and adds overnight returns.
    Keeps intraday returns intact except for the first bar of each day.
    """
    df = df.copy()

    # --- Overnight returns ---
    df['date'] = df['timestamp'].dt.date
    df['overnight_return'] = pd.NA
    df['overnight_log_return'] = pd.NA
    
    for date, group in df.groupby('date'):
        if len(group) > 0:
            first_idx = group.index[0]
            prev_idx = first_idx - 1
            if prev_idx in df.index:
                prev_close = df.loc[prev_idx, 'close']
                this_close = df.loc[first_idx, 'close']
                raw_r = (this_close / prev_close) - 1
                log_r = np.log(this_close / prev_close)
                df.loc[first_idx, 'overnight_return'] = raw_r
                df.loc[first_idx, 'overnight_log_return'] = log_r

    # Drop first intraday return each day (so overnight isn’t mixed in)
    first_rows = df.groupby('date').head(1).index
    df.loc[first_rows, 'return'] = pd.NA
    df.loc[first_rows, 'log_return'] = pd.NA

    # --- Clean extreme intraday returns ---
    df.loc[df['log_return'].abs() > clip, 'log_return'] = np.nan

    return df

In [None]:
# -----------------------
# 2. Run analyses per symbol
# -----------------------
for symbol, df in all_dfs.items():
    print("="*80)
    print(f"Analysis for {symbol}")
    print("="*80)

    # Clean + enrich
    df = clean_and_enrich(df, clip=0.05)

    # Plot close vs VWAP
    fig = px.line(
        df,
        x="timestamp",
        y=["close", "vwap"],
        title=f"{symbol} Close vs VWAP ({years[0]}–{years[-1]}, 1-min)"
    )
    fig.show()
    
    # Intraday volume curve
    df['time'] = df['timestamp'].dt.time
    df['date'] = df['timestamp'].dt.date
    avg_volume = df.groupby('time')['volume'].mean().reset_index()
    fig = px.line(
        avg_volume,
        x="time", y="volume",
        title=f"Average Intraday Volume Curve ({symbol})"
    )
    fig.show()

    # Return distribution + Normal overlay
    mu, sigma = df['log_return'].mean(), df['log_return'].std()
    fig = px.histogram(
        df, x="log_return", nbins=300,
        title=f"Distribution of 1-min log returns ({symbol})",
        histnorm="probability density"
    )
    # Overlay normal distribution
    x_vals = np.linspace(
        df['log_return'].dropna().min(),
        df['log_return'].dropna().max(),
        500
    )
    normal_pdf = stats.norm.pdf(x_vals, mu, sigma)
    fig.add_trace(go.Scatter(x=x_vals, y=normal_pdf, mode='lines',
                             name='Normal PDF', line=dict(color="red")))
    fig.show()
    
    # Q-Q plot
    sm.qqplot(df['log_return'].dropna(), line='s')
    plt.title(f'Q-Q Plot of 1-min log returns ({symbol})')
    plt.show()
    
    # Summary stats
    print(f"Mean log return: {df['log_return'].mean()}")
    print(f"Std log return: {df['log_return'].std()}")
    print(f"Min log return: {df['log_return'].min()}")
    print(f"Max log return: {df['log_return'].max()}")
    print(f"Skew: {df['log_return'].skew()}")
    print(f"Kurtosis: {df['log_return'].kurtosis()}")
    
    # Intraday volatility curve
    vol_curve = df.groupby('time')['log_return'].std()
    fig = px.line(
        x=vol_curve.index.astype(str),
        y=vol_curve.values,
        title=f"Average Intraday Volatility Curve ({symbol})",
        labels={'x':'Time of day', 'y':'Std of 1-min log returns'}
    )
    fig.show()
    
    # ACF plots
    returns = df['log_return'].dropna()
    fig, ax = plt.subplots(1,2, figsize=(12,4))
    sm.graphics.tsa.plot_acf(returns, lags=60, zero=False,
                             missing='drop', auto_ylims=True, ax=ax[0])
    ax[0].set_title(f"ACF of 1-min log returns ({symbol})")
    sm.graphics.tsa.plot_acf(returns**2, lags=60, zero=False,
                             missing='drop', auto_ylims=True, ax=ax[1])
    ax[1].set_title(f"ACF of Squared 1-min log returns ({symbol})")
    plt.show()

In [None]:
# -------------------------------
# 3. Cross-symbol relationships
# -------------------------------
if len(all_dfs) > 1:
    # Align on timestamps
    aligned = pd.DataFrame(index=pd.concat([df.set_index('timestamp') for df in all_dfs.values()]).index.unique())
    for symbol, df in all_dfs.items():
        aligned[symbol] = df.set_index('timestamp')['log_return']
    
    aligned = aligned.sort_index().dropna()
    print("Cross-correlation matrix of log returns:")
    print(aligned.corr())
    
    fig = px.imshow(aligned.corr(),
                    text_auto=".2f",
                    title="Cross-correlation of log returns across symbols")
    fig.show()
    
    # Optional: cross-correlogram between two symbols
    s1, s2 = symbols[:2]
    fig, ax = plt.subplots(figsize=(8,4))
    sm.graphics.tsa.plot_ccf(aligned[s1], aligned[s2], lags=60, auto_ylims=True, ax=ax)
    ax.set_title(f"Cross-correlation function: {s1} vs {s2}")
    plt.show()

In [None]:
# ---------------------------------
# 4. Save cleaned and enriched data
# ---------------------------------
from pathlib import Path

output_dir = Path("../data/processed")
output_dir.mkdir(parents=True, exist_ok=True)

for symbol, df in all_dfs.items():
    # Clean one more time for safety
    df_clean = clean_and_enrich(df, clip=0.05)
    
    # Save to processed directory
    save_path = output_dir / f"{symbol}_1min_clean.parquet"
    df_clean.to_parquet(save_path, index=False)
    print(f"✅ Saved cleaned data for {symbol}: {save_path}")
