In [1]:
# =========================
# Setup
# =========================

from pathlib import Path
import datetime as dt
import numpy as np
import pandas as pd

pd.set_option("display.width", 160)
pd.set_option("display.max_columns", 60)

ROOT = Path("..")
DATA = ROOT / "data"

# Input: processed comparison panel (use v3-aggregated if available)
PROC_COMPARE = DATA / "processed" / "compare"
PANEL_AGG    = PROC_COMPARE / "daily_panel_agg.parquet"
PANEL_FULL   = PROC_COMPARE / "daily_panel.parquet"

# Output: features
PROC_FEATURES = DATA / "processed" / "features"
PROC_FEATURES.mkdir(parents=True, exist_ok=True)
FEATURES_PATH = PROC_FEATURES / "daily_features.parquet"

# Events
EVENT_V3_LAUNCH = dt.date(2021, 5, 5)
EVENT_FTX       = dt.date(2022, 11, 10)

In [2]:
# =========================
# Helpers
# =========================
def _num(s, cols):
    """Coerce listed columns to numeric (inplace-safe copy returned)."""
    d = s.copy()
    for c in cols:
        if c in d.columns:
            d[c] = pd.to_numeric(d[c], errors="coerce")
    return d

def _safe_log(x, min_positive=1.0):
    """log of x with a lower bound (avoids -inf)."""
    return np.log(np.clip(x, min_positive, None))

def _clip01(x):
    return np.minimum(1.0, np.maximum(0.0, x))

In [3]:
# =========================
# Load panel
# =========================
panel_path = PANEL_AGG if PANEL_AGG.exists() else PANEL_FULL
df = pd.read_parquet(panel_path)
print(f"[loaded] {panel_path}  rows={len(df):,}  date={df['date'].min()}→{df['date'].max()}")

# Normalize a few columns to numeric
num_cols = [
    "volumeUSD", "ret",
    "eth_median_effective_gas_price_gwei",
    "proxy_chl","proxy_cs","proxy_amihud","proxy_roll",  # CEX proxies (and DEX proxies if present)
]
df = _num(df, [c for c in num_cols if c in df.columns])

[loaded] ..\data\processed\compare\daily_panel_agg.parquet  rows=67,910  date=2021-03-01→2023-02-28


In [4]:
# =========================
# v3 aggregation & v3_share
# =========================
# We compute v3_volumeUSD (sum of v3 fee tiers) and total DEX volume per label×date
dex = df.loc[df["venue_type"] == "DEX", ["label","date","venue","volumeUSD"]].copy()

# Identify v3 rows (either individual fee tiers like 'uniswap_v3_fee3000' or the synthetic 'uniswap_v3_all')
is_v3_row = dex["venue"].astype(str).str.startswith("uniswap_v3")
v3_by_day = (dex.loc[is_v3_row]
               .groupby(["label","date"], as_index=False)
               .agg(v3_volumeUSD=("volumeUSD","sum")))

dex_total = (dex
             .groupby(["label","date"], as_index=False)
             .agg(dex_total_volumeUSD=("volumeUSD","sum")))

v3_panel = dex_total.merge(v3_by_day, on=["label","date"], how="left")
v3_panel["v3_share"] = v3_panel["v3_volumeUSD"] / v3_panel["dex_total_volumeUSD"]
# If no DEX volume that day, leave as NaN; otherwise clip to [0,1]
m_pos = v3_panel["dex_total_volumeUSD"] > 0
v3_panel.loc[m_pos, "v3_share"] = _clip01(v3_panel.loc[m_pos, "v3_share"])
# Fill remaining NaNs with 0 (interpreted as "no v3 liquidity share observed")
v3_panel["v3_share_filled"] = _clip01(v3_panel["v3_share"].fillna(0.0))

# Merge v3 share back to all rows (so both DEX & CEX get the same label×date feature)
df = df.merge(v3_panel[["label","date","dex_total_volumeUSD","v3_volumeUSD","v3_share","v3_share_filled"]],
              on=["label","date"], how="left")

In [5]:
# =========================
# Core engineered features
# =========================
# Volume logs
if "volumeUSD" in df.columns:
    df["log_volumeUSD"] = _safe_log(df["volumeUSD"])
else:
    df["log_volumeUSD"] = np.nan

# Returns & transforms (if available)
if "ret" in df.columns:
    df["abs_ret"] = df["ret"].abs()
    df["ret2"]    = df["ret"] ** 2
else:
    df["abs_ret"] = np.nan
    df["ret2"]    = np.nan

# Venue flags
df["is_dex"] = (df["venue_type"] == "DEX").astype(int)
df["is_cex"] = (df["venue_type"] == "CEX").astype(int)
df["is_uniswap_v3"] = df["venue"].astype(str).str.startswith("uniswap_v3").astype(int)

# Event flags
df["post_v3"]  = (pd.to_datetime(df["date"]) >= pd.Timestamp(EVENT_V3_LAUNCH)).astype(int)
df["post_ftx"] = (pd.to_datetime(df["date"]) >= pd.Timestamp(EVENT_FTX)).astype(int)

In [6]:
# =========================
# Save features
# =========================
df = df.sort_values(["label","date","venue_type","venue"]).reset_index(drop=True)
df.to_parquet(FEATURES_PATH, index=False)
print(f"[OK] wrote features -> {FEATURES_PATH}")

[OK] wrote features -> ..\data\processed\features\daily_features.parquet


In [7]:
# =========================
# Sanity checks (quick)
# =========================
print("\n== Sanity checks ==")
n_rows   = len(df)
n_labels = df["label"].nunique()
n_venues = df["venue"].nunique()
d_min, d_max = df["date"].min(), df["date"].max()
print(f"rows: {n_rows:,}    labels: {n_labels}    venues: {n_venues}")
print(f"date range: {d_min} → {d_max}")

# v3_share range
bad_share = df.loc[(df["v3_share"].notna()) & ((df["v3_share"] < 0) | (df["v3_share"] > 1))]
print(f"v3_share out-of-range rows: {len(bad_share)}")

# v3 present before launch?
mismatch = df.loc[(pd.to_datetime(df["date"]) < pd.Timestamp(EVENT_V3_LAUNCH)) &
                  (df["v3_share"].fillna(0) > 0)]
print(f"treatment flag mismatches (v3>0 before launch): {len(mismatch)}")

# Must-have feature columns
required = [
    "log_volumeUSD", "v3_share_filled", "is_dex", "is_cex",
    "post_v3", "post_ftx", "eth_median_effective_gas_price_gwei"
]
missing = [c for c in required if c not in df.columns]
print("missing required feature columns:", missing)

# Null snapshot for a few key features
focus = [c for c in [
    "proxy_chl","proxy_roll","proxy_amihud","proxy_cs",
    "ret","abs_ret","ret2",
    "v3_share","v3_share_filled",
    "volumeUSD","log_volumeUSD",
    "eth_median_effective_gas_price_gwei"
] if c in df.columns]
if focus:
    nulls = (df[focus].isna().mean().sort_values(ascending=False) * 100).round(2).to_frame("null_pct")
    print("\n[null % of selected features]")
    display(nulls)

# Tiny v3 aggregation snapshot (first few rows where we have v3 or DEX totals)
snap_cols = ["label","date","venue","volumeUSD","dex_total_volumeUSD","v3_volumeUSD","v3_share"]
snap = (df.loc[df["dex_total_volumeUSD"].notna(), snap_cols]
          .sort_values(["label","date","venue"])
          .head(12))
print("\n[v3 aggregation snapshot]")
display(snap)


== Sanity checks ==
rows: 67,910    labels: 20    venues: 9
date range: 2021-03-01 → 2023-02-28
v3_share out-of-range rows: 0
treatment flag mismatches (v3>0 before launch): 0
missing required feature columns: []

[null % of selected features]


Unnamed: 0,null_pct
proxy_chl,88.06
proxy_cs,76.71
abs_ret,76.71
ret,76.71
ret2,76.71
proxy_roll,49.0
proxy_amihud,18.49
eth_median_effective_gas_price_gwei,17.83
v3_share,6.66
v3_share_filled,0.08



[v3 aggregation snapshot]


Unnamed: 0,label,date,venue,volumeUSD,dex_total_volumeUSD,v3_volumeUSD,v3_share
0,AAVE-ETH,2021-03-01,binance,1753.662,5086411.0,,
1,AAVE-ETH,2021-03-01,uniswap_v2,5086411.0,5086411.0,,
2,AAVE-ETH,2021-03-02,binance,2064.193,5234286.0,,
3,AAVE-ETH,2021-03-02,uniswap_v2,5234286.0,5234286.0,,
4,AAVE-ETH,2021-03-03,binance,959.8955,2994202.0,,
5,AAVE-ETH,2021-03-03,uniswap_v2,2994202.0,2994202.0,,
6,AAVE-ETH,2021-03-04,binance,657.5084,3061245.0,,
7,AAVE-ETH,2021-03-04,uniswap_v2,3061245.0,3061245.0,,
8,AAVE-ETH,2021-03-05,binance,836.3535,2221019.0,,
9,AAVE-ETH,2021-03-05,uniswap_v2,2221019.0,2221019.0,,
