In [None]:
import os, re, io, zipfile, shutil, random, warnings
from pathlib import Path
from typing import List, Set
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

warnings.filterwarnings("ignore")
pd.set_option("display.max_columns", 100)

In [None]:
DATA_DIR   = Path("data")
TMP_DIR    = DATA_DIR / "_extract_tmp"
OUTDIR     = Path("outputs"); OUTDIR.mkdir(parents=True, exist_ok=True)
PLOT_DIR   = OUTDIR / "plots"; PLOT_DIR.mkdir(parents=True, exist_ok=True)

PERQ_CSV   = OUTDIR / "per_security_quarter_means.csv"
STATS_ALL  = OUTDIR / "stats_overall_byType_byQuarter.csv"
COHORT_S   = OUTDIR / "cohort_stats_stocks_baseQuarter.csv"
COHORT_E   = OUTDIR / "cohort_stats_etfs_baseQuarter.csv"
COHORT_T_S = OUTDIR / "cohort_tickers_stocks_baseQuarter.txt"
COHORT_T_E = OUTDIR / "cohort_tickers_etfs_baseQuarter.txt"

RNG_SEED = 23
random.seed(RNG_SEED); np.random.seed(RNG_SEED)

TIME_COL   = "Quarter"
TICKER_COL = "Ticker"
TYPE_COL   = "Type"

RAW_VARS = [
    "Cancels","Trades","LitTrades","OddLots","Hidden","TradesForHidden",
    "OrderVol","TradeVol","LitVol","OddLotVol","HiddenVol","TradeVolForHidden",
    "TradesForOddLots","TradeVolForOddLots"
]

METRICS = {
    "Cancel_to_Trade": ("Cancels","Trades"),
    "Trade_to_Order_Volume_pct": ("TradeVol","OrderVol"),
    "Hidden_Rate_pct": ("TradesForHidden","Trades"),
    "Hidden_Volume_pct": ("TradeVolForHidden","TradeVol"),
    "Oddlot_Rate_pct": ("OddLots","Trades"),
    "Oddlot_Volume_pct": ("OddLotVol","TradeVol"),
}

KEEP_COLS_MIN = ["Date","Security",TICKER_COL,TIME_COL,TYPE_COL]
KEEP_COLS_ALL = KEEP_COLS_MIN + RAW_VARS + list(METRICS.keys())

_ETF_RE = re.compile(r"(ETF|ETP|EXCHANGE\s*TRADED|TRUST)", re.IGNORECASE)
pat_qy1 = re.compile(r"q([1-4])[_-]?(\d{4})_all\.csv$", re.IGNORECASE)
pat_qy2 = re.compile(r"individual_security[_-]?(\d{4})[_-]?q([1-4])\.csv$", re.IGNORECASE)
pat_any_q = re.compile(r"(?i)(?:^|[_-])(q([1-4]))[_-]?((?:20)?\d{2})|((?:20)?\d{2})[_-]?(q([1-4]))")


In [None]:
def read_csv_path_robust(path: Path, usecols=None, chunksize=None):
    try:
        return pd.read_csv(path, engine="c", usecols=usecols, chunksize=chunksize, low_memory=False)
    except Exception:
        return pd.read_csv(path, engine="python", on_bad_lines="skip", usecols=usecols, chunksize=chunksize, low_memory=False)

def unzip_iter_csvs_and_loose_csvs(data_dir: Path, tmp_root: Path):
    if tmp_root.exists(): shutil.rmtree(tmp_root)
    tmp_root.mkdir(parents=True, exist_ok=True)
    for zp in sorted(data_dir.glob("*.zip")):
        try:
            with zipfile.ZipFile(zp) as zf:
                zf.extractall(tmp_root / zp.stem)
                subdir = tmp_root / zp.stem
                cands = [p for p in subdir.rglob("*.csv")]
                if not cands:
                    continue
                yield max(cands, key=lambda p: p.stat().st_size)
        except Exception:
            continue
    for p in sorted(data_dir.glob("*.csv")):
        yield p

def extract_quarter_from_path(path: Path) -> str | None:
    b = path.name
    m = pat_qy1.search(b)
    if m: return f"{m.group(2)}Q{m.group(1)}"
    m = pat_qy2.search(b)
    if m: return f"{m.group(1)}Q{m.group(2)}"
    for part in [path.stem, path.parent.name, path.parent.parent.name]:
        if not part: continue
        mm = pat_any_q.search(part)
        if mm:
            q1, y2, y3, q2 = mm.group(1), mm.group(3), mm.group(4), mm.group(6)
            if q1 and y2:
                y = int(y2); y = (y+2000) if y < 100 else y
                return f"{y}Q{int(q1[-1])}"
            if y3 and q2:
                y = int(y3); y = (y+2000) if y < 100 else y
                return f"{y}Q{int(q2[-1])}"
    return None

def ensure_header(path: Path):
    return (not path.exists()) or (path.stat().st_size == 0)

def to_quarter_series(s: pd.Series) -> pd.Series:
    dt = pd.to_datetime(s, errors="coerce")
    out = pd.Series(pd.NA, index=s.index, dtype="string")
    m = dt.notna() & (dt.dt.year >= 2010) & (dt.dt.year <= 2030)
    if m.any():
        out.loc[m] = (dt.dt.year[m].astype("Int64").astype("string")
                      + "Q" +
                      dt.dt.quarter[m].astype("Int64").astype("string"))
    return out

def normalize_columns(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df.columns = df.columns.str.replace(r"\('000\)$", "", regex=True)
    if "Ticker" in df.columns:
        df["Ticker"] = df["Ticker"].astype(str).str.strip().str.upper()
    def to_num(cols: List[str]):
        if cols:
            df[cols] = df[cols].apply(pd.to_numeric, errors="coerce")
    if "Trades" not in df.columns:
        parts = [c for c in ("LitTrades","TradesForHidden","TradesForOddLots") if c in df.columns]
        to_num(parts); df["Trades"] = df[parts].sum(axis=1, min_count=1) if parts else pd.NA
    if "TradeVol" not in df.columns:
        if "TradeVolForOddLots" in df.columns:
            df["TradeVol"] = pd.to_numeric(df["TradeVolForOddLots"], errors="coerce")
        else:
            parts = [c for c in ("LitVol","HiddenVol","TradeVolForOddLots") if c in df.columns]
            to_num(parts); df["TradeVol"] = df[parts].sum(axis=1, min_count=1) if parts else pd.NA
    return df

def classify_type(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    if "Security" in df.columns:
        sec = df["Security"].astype(str).str.strip()
        sec_u = sec.str.upper()
        uniq = set(sec_u.unique())
        if uniq.issubset({"STOCK","ETF"}):
            df[TYPE_COL] = sec_u.map({"STOCK":"Stock","ETF":"ETF"}).astype("category")
        else:
            df[TYPE_COL] = np.where(sec.str.contains(_ETF_RE, na=False), "ETF", "Stock").astype("category")
    else:
        df[TYPE_COL] = pd.Series(pd.Categorical([pd.NA]*len(df), categories=["Stock","ETF"]), index=df.index)
    return df

def normalize_type_values(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    if "Type" in df.columns:
        s = df["Type"].astype(str).str.strip().str.lower().replace({"etp":"etf"})
        s = s.map(lambda x: "ETF" if x == "etf" else ("Stock" if x == "stock" else np.nan))
        df["Type"] = pd.Categorical(s, categories=["Stock","ETF"])
    return df

def add_metrics(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    for new_col,(num_col,den_col) in METRICS.items():
        num = pd.to_numeric(df.get(num_col), errors="coerce")
        den = pd.to_numeric(df.get(den_col), errors="coerce")
        ratio = num.div(den).where(den.ne(0))
        if new_col.endswith("_pct"): ratio = ratio * 100.0
        df[new_col] = ratio
    return df

def quarter_from_name_or_date(df: pd.DataFrame, q_hint: str | None) -> pd.Series:
    q_from_date = None
    if "Date" in df.columns:
        q_from_date = to_quarter_series(df["Date"])
    if q_hint is not None:
        if q_from_date is None:
            return pd.Series(q_hint, index=df.index, dtype="string")
        out = q_from_date.copy()
        out = out.fillna(q_hint)
        return out
    else:
        if q_from_date is not None:
            return q_from_date
        return pd.Series(pd.NA, index=df.index, dtype="string")

In [None]:
if PERQ_CSV.exists():
    PERQ_CSV.unlink()

for csv_path in unzip_iter_csvs_and_loose_csvs(DATA_DIR, TMP_DIR):
    q_hint = extract_quarter_from_path(csv_path)
    reader = read_csv_path_robust(csv_path, usecols=None, chunksize=200_000)
    for chunk in reader:
        chunk = normalize_columns(chunk)
        chunk[TIME_COL] = quarter_from_name_or_date(chunk, q_hint)
        chunk = classify_type(chunk)
        chunk = normalize_type_values(chunk)
        chunk = add_metrics(chunk)
        have = [c for c in KEEP_COLS_ALL if c in chunk.columns]
        if not have:
            continue
        chunk = chunk[have]
        chunk = chunk[chunk[TIME_COL].notna() & chunk[TICKER_COL].notna() & chunk[TYPE_COL].notna()]
        vals = [c for c in (RAW_VARS + list(METRICS.keys())) if c in chunk.columns]
        if not vals:
            continue
        if not chunk[vals].notna().any(axis=1).any():
            continue
        chunk = chunk[chunk[vals].notna().any(axis=1)]
        agg = chunk.groupby([TYPE_COL, TIME_COL, TICKER_COL], as_index=False)[vals].mean()
        agg.to_csv(PERQ_CSV, mode="a", index=False, header=ensure_header(PERQ_CSV))

In [None]:
def grouped_descriptive_stats(df: pd.DataFrame, group_cols: List[str], value_cols: List[str]) -> pd.DataFrame:
    def _agg(g: pd.DataFrame):
        out = {}
        for col in value_cols:
            x = pd.to_numeric(g[col], errors='coerce')
            out[(col,'mean')] = np.nanmean(x)
            out[(col,'p25')]  = np.nanpercentile(x,25)
            out[(col,'p50')]  = np.nanpercentile(x,50)
            out[(col,'p75')]  = np.nanpercentile(x,75)
            out[(col,'std')]  = np.nanstd(x, ddof=1) if np.sum(~np.isnan(x))>1 else np.nan
        return pd.Series(out)
    stats = df.groupby(group_cols, dropna=False, sort=True).apply(_agg).reset_index()
    stats.columns = ['_'.join([c for c in col if c]) if isinstance(col,tuple) else col for col in stats.columns]
    return stats

usecols_perq = [TYPE_COL, TIME_COL, TICKER_COL] + RAW_VARS + list(METRICS.keys())
perq = pd.read_csv(PERQ_CSV, usecols=lambda c: c in set(usecols_perq), low_memory=False)

perq[TIME_COL]   = perq[TIME_COL].astype(str).str.strip()
perq[TICKER_COL] = perq[TICKER_COL].astype(str).str.strip()
perq[TYPE_COL]   = perq[TYPE_COL].astype(str).str.strip().str.capitalize()

target_cols_all = [c for c in RAW_VARS + list(METRICS.keys()) if c in perq.columns]

if target_cols_all:
    stats_overall = grouped_descriptive_stats(perq, [TYPE_COL, TIME_COL], target_cols_all)
    stats_overall.to_csv(STATS_ALL, index=False)

def quarter_key(qs: str):
    m = re.match(r"^(\d{4})Q([1-4])$", str(qs))
    return (int(m.group(1)), int(m.group(2))) if m else (9999, 9)

def tickers_in_quarter(df, q, t):
    sub = df[(df[TIME_COL]==q) & (df[TYPE_COL]==t)]
    return set(sub[TICKER_COL].dropna().astype(str).unique())

desired_base = "2012Q1"
available_quarters = sorted(perq[TIME_COL].unique(), key=quarter_key)
s_des = tickers_in_quarter(perq, desired_base, "Stock")
e_des = tickers_in_quarter(perq, desired_base, "ETF") | tickers_in_quarter(perq, desired_base, "Etf")

if (desired_base not in set(perq[TIME_COL])) or (len(s_des) < 100) or (len(e_des) < 100):
    fallback = None
    for q in available_quarters:
        s = tickers_in_quarter(perq, q, "Stock")
        e = tickers_in_quarter(perq, q, "ETF") | tickers_in_quarter(perq, q, "Etf")
        if len(s) >= 100 and len(e) >= 100:
            fallback = q; break
    base_quarter = fallback if fallback else (available_quarters[0] if available_quarters else "NA")
else:
    base_quarter = desired_base

In [None]:
def sample_cohort(df: pd.DataFrame, cohort_quarter: str, type_value: str, n=100, seed=RNG_SEED) -> Set[str]:
    base = df[(df[TIME_COL]==cohort_quarter) & (df[TYPE_COL]==type_value)]
    tickers = base[TICKER_COL].dropna().astype(str).unique()
    k = min(n, len(tickers))
    if k == 0: return set()
    rng = np.random.default_rng(seed)
    return set(rng.choice(tickers, size=k, replace=False).tolist())

cohort_stocks = sample_cohort(perq[[TYPE_COL,TIME_COL,TICKER_COL]], base_quarter, "Stock", 100, RNG_SEED)
cohort_etfs   = sample_cohort(perq[[TYPE_COL,TIME_COL,TICKER_COL]], base_quarter, "ETF",   100, RNG_SEED)
if len(cohort_etfs) < 100:
    cohort_etfs = sample_cohort(perq[[TYPE_COL,TIME_COL,TICKER_COL]], base_quarter, "Etf", 100, RNG_SEED) or cohort_etfs

with open(COHORT_T_S, "w") as f: 
    for t in sorted(cohort_stocks): f.write(t+"\n")
with open(COHORT_T_E, "w") as f: 
    for t in sorted(cohort_etfs): f.write(t+"\n")

def cohort_time_stats(perq_df: pd.DataFrame, cohort_tickers: Set[str], value_cols: List[str]) -> pd.DataFrame:
    if not cohort_tickers:
        return pd.DataFrame(columns=[TIME_COL] + sum([[f"{c}_{s}" for s in ("mean","p25","p50","p75","std")] for c in value_cols], []))
    sub = perq_df[perq_df[TICKER_COL].astype(str).isin(cohort_tickers)].copy()
    return grouped_descriptive_stats(sub, [TIME_COL], value_cols).sort_values(TIME_COL)

stats_stock = cohort_time_stats(perq[perq[TYPE_COL].str.lower()=="stock"], cohort_stocks, target_cols_all)
stats_etf   = cohort_time_stats(perq[perq[TYPE_COL].str.lower().isin(["etf","etp"])], cohort_etfs, target_cols_all)

stats_stock.to_csv(COHORT_S, index=False)
stats_etf.to_csv(COHORT_E, index=False)

In [None]:
def list_metrics_in_df(df: pd.DataFrame) -> List[str]:
    names = []
    for c in df.columns:
        if c == TIME_COL: continue
        if "_" in c:
            names.append(c.rsplit("_",1)[0])
    return sorted(list(set(names)))

def plot_descriptive_stats(df: pd.DataFrame, cohort_name: str, base_q: str):
    if df is None or df.empty:
        return
    metrics = list_metrics_in_df(df)
    if not metrics:
        return
    for m in metrics:
        cols = [c for c in df.columns if c.startswith(m+"_")]
        if not cols: 
            continue
        x = df[TIME_COL].astype(str).tolist()
        plt.figure(figsize=(11,6))
        for c in sorted(cols):
            plt.plot(x, df[c].values, marker="o", label=c.replace(m+"_",""))
        plt.title(f"{cohort_name} ({base_q} cohort): {m} — descriptive stats over time")
        plt.xlabel("Quarter")
        plt.ylabel(m)
        plt.xticks(rotation=45)
        plt.grid(True, alpha=0.3)
        plt.legend()
        plt.tight_layout()
        out_path = PLOT_DIR / f"{cohort_name}_{m}.png"
        plt.savefig(out_path, dpi=160)
        plt.close()

plot_descriptive_stats(stats_stock, "StocksCohort", base_quarter)
plot_descriptive_stats(stats_etf, "ETFsCohort", base_quarter)