In [66]:
"""
Full pair trading cluster pipeline trên toàn universe:

1) Lấy danh sách ticker từ folder per_symbol (price volume).
2) Lấy sectorKey, industryKey bằng yahooquery (hoặc đọc từ sector_industry.csv nếu đã có).
3) Tính volatility 1 năm cho toàn bộ universe, chia decile.
4) Tính beta với SPY cho toàn bộ universe.
5) Với từng cặp (sectorKey, industryKey) có >= MIN_GROUP_SIZE:
   - Lọc theo volatility decile (mid vol).
   - Lọc theo beta gần nhau (median ± BETA_TOL).
   - Tính return 3 năm, average dollar volume và chọn dv band.
   - Chọn top tickers theo mean pairwise correlation trong band.
   - Test cointegration tất cả cặp trong cụm con, lấy các mã có pvalue < COINT_ALPHA.
   - Xuất file csv cuối cùng với OHLCV sạch cho các mã cointegrated.

Output: một folder OUTPUT_DIR chứa các file:
  cluster_{sectorKey}_{industryKey}.csv
"""

import os
from itertools import combinations
from functools import reduce

import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import coint

# Optional: chỉ cần khi cần fetch sector/industry lần đầu
# !pip install -q yahooquery
try:
    from yahooquery import Ticker as YQTicker
except ImportError:
    YQTicker = None

# ========================
# CONFIG
# ========================

# Thư mục chứa các file price volume per symbol
DATA_DIR = "/kaggle/input/computational-finance/yfinance/yfinance/per_symbol"

# File SPY riêng
SPY_PATH = "/kaggle/input/computational-finance/SPY.csv"

# File sector industry (nếu đã tạo trước)
SECTOR_FILE = "/kaggle/input/computational-finance/sector_industry.csv"

# Folder output cuối cùng
OUTPUT_DIR = "clusters"
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Ngưỡng và tham số
MIN_GROUP_SIZE = 50          # số mã tối thiểu cho một cặp sector industry
VOL_MIN_OBS = 200             # số quan sát tối thiểu để tính vol 1y
VOL_LOOKBACK_DAYS = 252       # số ngày gần nhất để tính vol 1y
VOL_DECILES_KEEP = [4, 5, 6]  # dải volatility giữ lại

BETA_MIN_OBS = 200
BETA_TOL = 0.2                # beta gần median ± 0.2

RET_LOOKBACK_YEARS = 3        # số năm lookback cho return, corr, dollar vol
RET_MIN_OBS = 200

DV_BAND_LOW = 0.5             # chọn mã có avg dollar volume trong [0.5, 2] * median
DV_BAND_HIGH = 2.0

TOP_K_BY_CORR = 20            # chọn tối đa 10 mã có mean corr cao nhất trong band dv

COINT_LOOKBACK_YEARS = 3
COINT_MIN_OBS = 200
COINT_ALPHA = 0.05            # pvalue < 0.05 thì coi là cointegrated


# ========================
# STEP 0: UTILITIES
# ========================

def list_tickers(data_dir=DATA_DIR):
    return sorted(
        [name.split(".")[0] for name in os.listdir(data_dir) if name.endswith(".csv")]
    )


def load_ohlcv(ticker, data_dir=DATA_DIR):
    """
    Đọc file 1 ticker, trả về DataFrame với cột:
      Date (datetime), Open, High, Low, Close, Volume (numeric)
    Tự xử lý một số kiểu header noise (dòng đầu chứa ticker, v.v.).
    """
    path = os.path.join(data_dir, f"{ticker}.csv")
    if not os.path.exists(path):
        print(f"[warn] missing file: {path}")
        return None

    try:
        df = pd.read_csv(path)
    except Exception as e:
        print(f"[warn] read error {ticker}: {e}")
        return None

    # Cột ngày
    if "Date" in df.columns:
        date_col = "Date"
    elif "date" in df.columns:
        date_col = "date"
    else:
        # fallback kiểu file SPY dạng Price + dòng Date (ít gặp với per_symbol)
        if "Price" in df.columns and str(df.loc[1, "Price"]).lower() == "date":
            df = df.iloc[2:].copy()
            df = df.rename(columns={"Price": "Date"})
            date_col = "Date"
        else:
            print(f"[warn] no date col {ticker}")
            return None

    df[date_col] = pd.to_datetime(df[date_col], errors="coerce")
    df = df.dropna(subset=[date_col])
    if df.empty:
        print(f"[warn] empty after date parse {ticker}")
        return None

    df = df.sort_values(date_col)

    # Map cột
    col_map = {"Open": None, "High": None, "Low": None, "Close": None, "Volume": None}

    for col in ["Open", "open"]:
        if col in df.columns:
            col_map["Open"] = col
            break
    for col in ["High", "high"]:
        if col in df.columns:
            col_map["High"] = col
            break
    for col in ["Low", "low"]:
        if col in df.columns:
            col_map["Low"] = col
            break
    for col in ["Adj Close", "Adj_Close", "adj_close", "Close", "close"]:
        if col in df.columns:
            col_map["Close"] = col
            break
    for col in ["Volume", "volume", "Vol", "vol"]:
        if col in df.columns:
            col_map["Volume"] = col
            break

    if col_map["Close"] is None or col_map["Volume"] is None:
        print(f"[warn] missing Close or Volume for {ticker}")
        return None

    # Convert numeric
    for key, col in col_map.items():
        if col is not None:
            df[col] = pd.to_numeric(df[col], errors="coerce")

    df = df.dropna(subset=[col_map["Close"], col_map["Volume"]])
    if df.empty:
        print(f"[warn] empty after numeric {ticker}")
        return None

    out = pd.DataFrame(
        {
            "Date": df[date_col],
            "Open": df[col_map["Open"]] if col_map["Open"] is not None else np.nan,
            "High": df[col_map["High"]] if col_map["High"] is not None else np.nan,
            "Low": df[col_map["Low"]] if col_map["Low"] is not None else np.nan,
            "Close": df[col_map["Close"]],
            "Volume": df[col_map["Volume"]],
        }
    )

    return out


# ========================
# STEP 1: SECTOR INDUSTRY
# ========================

def get_sector_industry(tickers, sector_file=SECTOR_FILE):
    """
    Lấy bảng sectorKey, industryKey cho toàn bộ tickers.
    Nếu đã có sector_file thì đọc, nếu chưa có và YQTicker khả dụng thì fetch từ yahooquery.
    """
    if os.path.exists(sector_file):
        df_sector = pd.read_csv(sector_file)
        return df_sector

    if YQTicker is None:
        raise RuntimeError(
            "sector_industry.csv không tồn tại và yahooquery không khả dụng. "
            "Cài đặt yahooquery hoặc tạo sector_industry.csv trước."
        )

    t = YQTicker(tickers)
    profiles = t.asset_profile

    rows = []
    for symbol, info in profiles.items():
        if not isinstance(info, dict):
            continue
        rows.append(
            {
                "ticker": symbol,
                "sectorKey": info.get("sectorKey"),
                "industryKey": info.get("industryKey"),
            }
        )

    df_sector = pd.DataFrame(rows)
    df_sector = df_sector[~df_sector["sectorKey"].isna()]
    df_sector.to_csv(sector_file, index=False)
    return df_sector


# ========================
# STEP 2: VOLATILITY 1Y
# ========================

def compute_1y_vol(ticker, data_dir=DATA_DIR, min_obs=VOL_MIN_OBS):
    """
    Tính annualized volatility 1 năm gần nhất trên log return Close.
    Dùng tối đa VOL_LOOKBACK_DAYS phiên gần nhất.
    """
    df = load_ohlcv(ticker, data_dir=data_dir)
    if df is None:
        return None

    df_recent = df.sort_values("Date").tail(VOL_LOOKBACK_DAYS)
    prices = df_recent["Close"].astype(float)
    rets = np.log(prices).diff().dropna()

    if rets.shape[0] < min_obs:
        return None

    vol_1y = rets.std() * np.sqrt(252.0)
    return {"ticker": ticker, "vol_1y": vol_1y}


def compute_all_vols(tickers, data_dir=DATA_DIR):
    rows = []
    for tk in tickers:
        res = compute_1y_vol(tk, data_dir=data_dir)
        if res is not None:
            rows.append(res)
    df_vol = pd.DataFrame(rows)
    df_vol = df_vol.replace([np.inf, -np.inf], np.nan).dropna(subset=["vol_1y"])
    df_vol = df_vol[df_vol["vol_1y"] > 0]
    df_vol["vol_decile"] = pd.qcut(df_vol["vol_1y"], 10, labels=False) + 1
    return df_vol


# ========================
# STEP 3: BETA VS SPY
# ========================

def load_spy(spy_path=SPY_PATH):
    """
    Load SPY với các format tương tự file riêng của bạn:
    cột Price, Close, High, Low, Open, Volume với hai dòng đầu là header noise.
    """
    df = pd.read_csv(spy_path)

    if "Date" in df.columns or "date" in df.columns:
        date_col = "Date" if "Date" in df.columns else "date"
    elif "Price" in df.columns and str(df.loc[1, "Price"]).lower() == "date":
        df = df.iloc[2:].copy()
        df = df.rename(columns={"Price": "Date"})
        date_col = "Date"
    else:
        return None

    df[date_col] = pd.to_datetime(df[date_col], errors="coerce")
    df = df.dropna(subset=[date_col])
    df = df.sort_values(date_col)

    # Chọn cột giá
    price_col = None
    for col in ["Adj Close", "Adj_Close", "adj_close", "Close", "close"]:
        if col in df.columns:
            price_col = col
            break
    if price_col is None:
        return None

    df[price_col] = pd.to_numeric(df[price_col], errors="coerce")
    df = df.dropna(subset=[price_col])
    if df.empty:
        return None

    df = df[[date_col, price_col]].rename(columns={date_col: "Date", price_col: "Close"})
    return df


def compute_spy_returns(spy_path=SPY_PATH, lookback_days=VOL_LOOKBACK_DAYS):
    df_spy = load_spy(spy_path)
    if df_spy is None or df_spy.empty:
        raise RuntimeError("Không load được SPY")

    df_spy = df_spy.sort_values("Date").tail(lookback_days).copy()
    df_spy["r_m"] = np.log(df_spy["Close"]).diff()
    spy_ret = df_spy[["Date", "r_m"]].dropna().reset_index(drop=True)
    return spy_ret


def compute_beta_vs_spy(ticker, spy_ret, data_dir=DATA_DIR, min_obs=BETA_MIN_OBS):
    df = load_ohlcv(ticker, data_dir=data_dir)
    if df is None:
        return None

    start_date = spy_ret["Date"].min()
    end_date = spy_ret["Date"].max()

    df = df[(df["Date"] >= start_date) & (df["Date"] <= end_date)].copy()
    df["r_i"] = np.log(df["Close"]).diff()
    df_ret = df[["Date", "r_i"]].dropna()

    merged = pd.merge(df_ret, spy_ret, on="Date", how="inner")
    if merged.shape[0] < min_obs:
        return None

    ri = merged["r_i"].values
    rm = merged["r_m"].values

    cov_im = np.cov(ri, rm, ddof=1)[0, 1]
    var_m = np.var(rm, ddof=1)
    if var_m == 0:
        return None

    beta = cov_im / var_m
    return {"ticker": ticker, "beta_spy": beta}


def compute_all_betas(tickers, spy_ret, data_dir=DATA_DIR):
    rows = []
    for tk in tickers:
        res = compute_beta_vs_spy(tk, spy_ret, data_dir=data_dir)
        if res is not None:
            rows.append(res)
    df_beta = pd.DataFrame(rows)
    return df_beta


# ========================
# STEP 4: RETURN MATRIX, DOLLAR VOLUME, CORRELATION
# ========================

def build_price_dict(tickers, data_dir=DATA_DIR):
    price_data = {}
    for tk in tickers:
        df = load_ohlcv(tk, data_dir=data_dir)
        if df is not None:
            price_data[tk] = df
    return price_data


def build_common_return_matrix(price_data, lookback_years=RET_LOOKBACK_YEARS):
    # Common date range
    start_dates = [df["Date"].min() for df in price_data.values()]
    end_dates = [df["Date"].max() for df in price_data.values()]
    latest_start = max(start_dates)
    earliest_end = min(end_dates)

    three_years = pd.Timestamp(earliest_end) - pd.Timedelta(days=365 * lookback_years)
    range_start = max(latest_start, three_years)
    range_end = earliest_end

    ret_list = []
    for tk, df in price_data.items():
        df_win = df[(df["Date"] >= range_start) & (df["Date"] <= range_end)].copy()
        df_win["r"] = np.log(df_win["Close"]).diff()
        df_ret = df_win[["Date", "r"]].dropna()
        df_ret = df_ret.rename(columns={"r": tk})
        ret_list.append(df_ret)

    if not ret_list:
        return None, None, None

    df_returns = reduce(
        lambda left, right: pd.merge(left, right, on="Date", how="inner"),
        ret_list,
    )

    df_returns = df_returns.sort_values("Date").reset_index(drop=True)
    return df_returns, range_start, range_end


def compute_avg_dollar_volume(price_data, range_start, range_end):
    rows = []
    for tk, df in price_data.items():
        df_win = df[(df["Date"] >= range_start) & (df["Date"] <= range_end)].copy()
        if df_win.empty:
            continue
        df_win["dollar_vol"] = df_win["Close"] * df_win["Volume"]
        avg_dv = df_win["dollar_vol"].mean()
        rows.append({"ticker": tk, "avg_dollar_vol": avg_dv})
    df_dv = pd.DataFrame(rows).dropna(subset=["avg_dollar_vol"])
    return df_dv


# ========================
# STEP 5: COINTEGRATION
# ========================

def find_cointegrated_pairs(df_cluster, tickers,
                            lookback_years=COINT_LOOKBACK_YEARS,
                            min_obs=COINT_MIN_OBS,
                            alpha=COINT_ALPHA):
    df_cluster = df_cluster.copy()
    df_cluster["Date"] = pd.to_datetime(df_cluster["Date"])

    max_date = df_cluster["Date"].max()
    start_cut = max_date - pd.Timedelta(days=365 * lookback_years)
    df_win = df_cluster[df_cluster["Date"] >= start_cut].copy()

    results = []
    good_pairs = []

    for t1, t2 in combinations(tickers, 2):
        s1 = df_win[df_win["ticker"] == t1][["Date", "Close"]]
        s2 = df_win[df_win["ticker"] == t2][["Date", "Close"]]

        merged = pd.merge(s1, s2, on="Date", how="inner", suffixes=("_1", "_2"))
        merged = merged.dropna()
        if merged.shape[0] < min_obs:
            continue

        y1 = np.log(merged["Close_1"].values)
        y2 = np.log(merged["Close_2"].values)

        try:
            stat, pvalue, crit = coint(y1, y2)
        except Exception as e:
            print(f"[warn] coint error {t1}-{t2}: {e}")
            continue

        results.append(
            {
                "ticker1": t1,
                "ticker2": t2,
                "pvalue": pvalue,
                "stat": stat,
            }
        )

        if pvalue < alpha:
            good_pairs.append((t1, t2, pvalue))

    res_df = pd.DataFrame(results).sort_values("pvalue") if results else pd.DataFrame()
    return res_df, good_pairs


def build_cluster_dataset(tickers, data_dir=DATA_DIR):
    dfs = []
    for tk in tickers:
        df_tk = load_ohlcv(tk, data_dir=data_dir)
        if df_tk is None:
            continue
        df_tk = df_tk.copy()
        df_tk["ticker"] = tk
        dfs.append(df_tk)

    if not dfs:
        return None

    df_all = pd.concat(dfs, ignore_index=True)
    df_all = df_all.sort_values(["Date", "ticker"]).reset_index(drop=True)
    return df_all[["Date", "ticker", "Open", "High", "Low", "Close", "Volume"]]


# ========================
# STEP 6: GROUP PIPELINE
# ========================

def process_group(sector, industry, df_group, data_dir=DATA_DIR):
    """
    Chạy pipeline cho một cặp (sectorKey, industryKey).
    df_group: subset của universe với các cột ticker, vol_decile, beta_spy.
    Trả về df_cluster_coint cuối cùng (hoặc None nếu fail).
    """
    # 1. Lọc volatile mid decile
    sub = df_group[df_group["vol_decile"].isin(VOL_DECILES_KEEP)].copy()
    if sub.shape[0] < 2:
        return None

    # 2. Lọc theo beta gần median
    beta_center = sub["beta_spy"].median()
    mask_beta = sub["beta_spy"].between(beta_center - BETA_TOL,
                                        beta_center + BETA_TOL)
    sub_beta = sub[mask_beta].copy()
    if sub_beta.shape[0] < 2:
        return None

    tickers = sub_beta["ticker"].unique().tolist()

    # 3. Build price data và return matrix
    price_data = build_price_dict(tickers, data_dir=data_dir)
    if len(price_data) < 2:
        return None

    df_returns, range_start, range_end = build_common_return_matrix(
        price_data,
        lookback_years=RET_LOOKBACK_YEARS,
    )
    if df_returns is None or df_returns.shape[1] <= 2:
        return None

    # 4. Dollar volume band
    df_dv = compute_avg_dollar_volume(price_data, range_start, range_end)
    if df_dv.empty:
        return None

    median_dv = df_dv["avg_dollar_vol"].median()
    lower = median_dv * DV_BAND_LOW
    upper = median_dv * DV_BAND_HIGH

    dv_band = df_dv[
        (df_dv["avg_dollar_vol"] >= lower) & (df_dv["avg_dollar_vol"] <= upper)
    ]
    band_tickers = set(dv_band["ticker"].tolist())
    if len(band_tickers) < 2:
        return None

    # 5. Correlation và chọn top tickers theo mean corr
    ret_mat = df_returns.drop(columns=["Date"])
    valid_cols = [c for c in ret_mat.columns if c in band_tickers]
    if len(valid_cols) < 2:
        return None

    corr_band = ret_mat[valid_cols].corr()

    mean_corr_per_ticker = {}
    for tk in valid_cols:
        s = corr_band[tk].drop(tk)
        mean_corr_per_ticker[tk] = s.mean()

    top_sorted = sorted(
        mean_corr_per_ticker,
        key=mean_corr_per_ticker.get,
        reverse=True,
    )
    k = min(TOP_K_BY_CORR, len(top_sorted))
    cluster_tickers = top_sorted[:k]

    # 6. Build cluster dataset và cointegration
    df_cluster = build_cluster_dataset(cluster_tickers, data_dir=data_dir)
    if df_cluster is None:
        return None

    res_df, good_pairs = find_cointegrated_pairs(
        df_cluster,
        cluster_tickers,
        lookback_years=COINT_LOOKBACK_YEARS,
        min_obs=COINT_MIN_OBS,
        alpha=COINT_ALPHA,
    )

    if not good_pairs:
        return None

    cointegrated_tickers = sorted(
        set([t1 for t1, t2, _ in good_pairs] + [t2 for t1, t2, _ in good_pairs])
    )

    df_cluster_coint = df_cluster[df_cluster["ticker"].isin(cointegrated_tickers)].copy()
    df_cluster_coint = (
        df_cluster_coint.sort_values(["Date", "ticker"]).reset_index(drop=True)
    )

    return df_cluster_coint


# ========================
# STEP 7: MAIN PIPELINE
# ========================

def run_full_pipeline():
    # 1. Universe tickers
    tickers = list_tickers(DATA_DIR)
    print("Total tickers from per_symbol:", len(tickers))

    # 2. Sector industry
    df_sector = get_sector_industry(tickers, sector_file=SECTOR_FILE)

    # 3. Volatility 1y
    print("Computing 1y volatility...")
    df_vol = compute_all_vols(tickers, data_dir=DATA_DIR)

    # 4. Beta vs SPY
    print("Computing beta vs SPY...")
    spy_ret = compute_spy_returns(SPY_PATH)
    df_beta = compute_all_betas(tickers, spy_ret, data_dir=DATA_DIR)

    # 5. Universe merged
    universe = (
        df_sector.merge(df_vol, on="ticker", how="inner")
        .merge(df_beta, on="ticker", how="inner")
    )

    print("Universe after merge:", universe.shape)

    # 6. Group by sectorKey, industryKey
    grouped = universe.groupby(["sectorKey", "industryKey"])

    for (sector, industry), df_group in grouped:
        if df_group.shape[0] < MIN_GROUP_SIZE:
            continue

        print(
            f"\nProcessing group: sector={sector}, industry={industry}, "
            f"n={df_group.shape[0]}"
        )

        df_cluster_coint = process_group(sector, industry, df_group, data_dir=DATA_DIR)

        if df_cluster_coint is None or df_cluster_coint.empty:
            print("  No final cluster for this group.")
            continue

        fname = f"cluster_{sector}_{industry}.csv"
        fname = fname.replace(" ", "_")
        out_path = os.path.join(OUTPUT_DIR, fname)
        df_cluster_coint.to_csv(out_path, index=False)
        print(
            f"  Saved final cluster for {sector}/{industry} "
            f"with {df_cluster_coint['ticker'].nunique()} tickers "
            f"to {out_path}"
        )

In [67]:
run_full_pipeline()

Total tickers from per_symbol: 1808
Computing 1y volatility...
Computing beta vs SPY...
Universe after merge: (1576, 7)

Processing group: sector=financial-services, industry=banks-regional, n=197
  Saved final cluster for financial-services/banks-regional with 11 tickers to clusters/cluster_financial-services_banks-regional.csv

Processing group: sector=healthcare, industry=biotechnology, n=214
  Saved final cluster for healthcare/biotechnology with 2 tickers to clusters/cluster_healthcare_biotechnology.csv

Processing group: sector=technology, industry=software-application, n=61
  Saved final cluster for technology/software-application with 2 tickers to clusters/cluster_technology_software-application.csv
