# Kalshi Weather Market — Snapshot Analysis

Interactive exploration of live-collected market snapshots and orderbook data.  
All parquet files under `collector/data/` are auto-detected; new snapshots appear on re-run.

In [None]:
import re
from pathlib import Path
from datetime import datetime

import pandas as pd
import pyarrow.parquet as pq
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import ipywidgets as widgets
from IPython.display import display, Markdown

pd.set_option("display.max_columns", 30)
pd.set_option("display.max_rows", 100)
pd.set_option("display.float_format", "{:.2f}".format)

## 1. Auto-detect & load all parquet files

In [None]:
# ---------- paths (auto-detect) ----------
DATA_ROOT = Path("../collector/data")
MKT_DIR   = DATA_ROOT / "market_snapshots"
OB_DIR    = DATA_ROOT / "orderbook_snapshots"
HIST_CANDLES = DATA_ROOT / "historical" / "candlesticks"
HIST_TRADES  = DATA_ROOT / "historical" / "trades"


def load_all_parquets(directory: Path) -> pd.DataFrame:
    """Read and concatenate every .parquet file in *directory*."""
    files = sorted(directory.glob("*.parquet")) if directory.exists() else []
    if not files:
        return pd.DataFrame()
    tables = [pq.read_table(f) for f in files]
    import pyarrow as pa
    return pa.concat_tables(tables).to_pandas()


mkt_df = load_all_parquets(MKT_DIR)
ob_df  = load_all_parquets(OB_DIR)

# Historical (may be empty until backfill runs)
candle_df = load_all_parquets(HIST_CANDLES)
trade_df  = load_all_parquets(HIST_TRADES)

print(f"Market snapshots : {mkt_df.shape[0]:>8,} rows  from {len(list(MKT_DIR.glob('*.parquet')))} file(s)")
print(f"Orderbook snaps  : {ob_df.shape[0]:>8,} rows  from {len(list(OB_DIR.glob('*.parquet')))} file(s)")
print(f"Hist candlesticks: {candle_df.shape[0]:>8,} rows")
print(f"Hist trades      : {trade_df.shape[0]:>8,} rows")

## 2. Parse event tickers & enrich data

Event ticker format: `KXHIGHCHI-26FEB11`  
- **Series** = `KXHIGHCHI` (Kalshi High Temp — Chicago)  
- **Date code** = `26FEB11` → 2026-02-11  

We extract city, target date, and a human-friendly label for every event.

In [None]:
CITY_CODES = {
    "CHI": "Chicago",
    "NY":  "New York",
    "MIA": "Miami",
    "DEN": "Denver",
    "AUS": "Austin",
    "HOU": "Houston",
    "PHL": "Philadelphia",
}

_EVENT_RE = re.compile(
    r"^(?P<series>KXHIGH(?P<city>[A-Z]+))"
    r"-(?P<yy>\d{2})(?P<mon>[A-Z]{3})(?P<dd>\d{2})$"
)


def parse_event_ticker(ticker: str) -> dict:
    """Extract structured fields from an event ticker string."""
    m = _EVENT_RE.match(ticker)
    if not m:
        return {"series": ticker, "city_code": "", "city": ticker,
                "target_date": None, "event_label": ticker}
    city_code = m.group("city")
    city = CITY_CODES.get(city_code, city_code)
    target_date = datetime.strptime(
        f"20{m.group('yy')}-{m.group('mon')}-{m.group('dd')}", "%Y-%b-%d"
    ).date()
    label = f"{city} — {target_date:%b %d, %Y}"
    return {
        "series": m.group("series"),
        "city_code": city_code,
        "city": city,
        "target_date": target_date,
        "event_label": label,
    }


def enrich_market_df(df: pd.DataFrame) -> pd.DataFrame:
    """Add parsed event metadata columns to a market-snapshot dataframe."""
    if df.empty:
        return df
    parsed = df["event_ticker"].map(parse_event_ticker).apply(pd.Series)
    df = pd.concat([df, parsed], axis=1)
    # Cents → probability
    df["mid_price"] = (df["yes_bid"] + df["yes_ask"]) / 2
    df["spread"]    = df["yes_ask"] - df["yes_bid"]
    return df


mkt = enrich_market_df(mkt_df)
if not mkt.empty:
    display(Markdown("### Detected events"))
    summary = (
        mkt.groupby(["event_ticker", "event_label"])
        .agg(
            contracts=("market_ticker", "nunique"),
            snapshots=("snapshot_ts", "nunique"),
            first_snap=("snapshot_ts", "min"),
            last_snap=("snapshot_ts", "max"),
        )
        .reset_index()
    )
    display(summary)

## 3. Event selector

Choose which event to explore.  All downstream cells react to this choice.

In [None]:
event_tickers = sorted(mkt["event_ticker"].unique()) if not mkt.empty else []

event_dropdown = widgets.Dropdown(
    options=[(f"{t}  ({parse_event_ticker(t)['event_label']})", t) for t in event_tickers],
    description="Event:",
    style={"description_width": "60px"},
    layout=widgets.Layout(width="500px"),
)
display(event_dropdown)


def selected_event() -> str:
    return event_dropdown.value


def event_mkt() -> pd.DataFrame:
    """Market snapshot rows for the selected event."""
    return mkt[mkt["event_ticker"] == selected_event()].copy()


def event_ob() -> pd.DataFrame:
    """Orderbook rows for the selected event's contracts."""
    tickers = event_mkt()["market_ticker"].unique()
    return ob_df[ob_df["market_ticker"].isin(tickers)].copy()

## 4. Snapshot summary for selected event

In [None]:
em = event_mkt()
display(Markdown(f"### {selected_event()}  —  {parse_event_ticker(selected_event())['event_label']}"))
display(Markdown(f"**{em['snapshot_ts'].nunique()}** snapshots · "
                 f"**{em['market_ticker'].nunique()}** contracts · "
                 f"time range: `{em['snapshot_ts'].min()}` → `{em['snapshot_ts'].max()}`"))
display(Markdown("---"))

# Latest snapshot for each contract
latest = em.sort_values("snapshot_ts").groupby("market_ticker").last().reset_index()
latest_display = latest[[
    "market_ticker", "subtitle", "yes_bid", "yes_ask", "mid_price",
    "spread", "last_price", "volume", "open_interest",
]].sort_values("mid_price", ascending=False)

display(Markdown("#### Latest contract prices (cents = implied probability %)"))
display(latest_display.style.format({
    "mid_price": "{:.1f}¢",
    "spread": "{:.0f}¢",
    "volume": "{:,.0f}",
    "open_interest": "{:,.0f}",
}).bar(subset=["mid_price"], color="#5fba7d", vmin=0, vmax=100))

## 5. Price evolution over time

Shows `yes_bid`, `yes_ask`, and `mid_price` for every contract across all snapshots.

In [None]:
em = event_mkt()

if em["snapshot_ts"].nunique() < 2:
    display(Markdown("> **Only 1 snapshot available** — price evolution chart will be "
                     "more useful once more snapshots are collected.  "
                     "Showing current values as a bar chart instead."))
    fig = px.bar(
        em.sort_values("mid_price", ascending=False),
        x="subtitle", y="mid_price",
        color="subtitle",
        title=f"Current implied probabilities — {selected_event()}",
        labels={"mid_price": "Mid price (¢ = implied %)", "subtitle": "Contract"},
    )
    fig.update_layout(showlegend=False, yaxis_range=[0, 100])
    fig.show()
else:
    fig = px.line(
        em.sort_values("snapshot_ts"),
        x="snapshot_ts", y="mid_price",
        color="subtitle",
        title=f"Mid-price evolution — {selected_event()}",
        labels={"mid_price": "Mid price (¢)", "snapshot_ts": "Time (UTC)"},
        markers=True,
    )
    fig.update_layout(
        yaxis_range=[0, 100],
        hovermode="x unified",
        legend_title_text="Contract",
    )
    fig.show()

    # Bid-ask spread over time
    fig2 = px.line(
        em.sort_values("snapshot_ts"),
        x="snapshot_ts", y="spread",
        color="subtitle",
        title=f"Bid-ask spread over time — {selected_event()}",
        labels={"spread": "Spread (¢)", "snapshot_ts": "Time (UTC)"},
        markers=True,
    )
    fig2.update_layout(hovermode="x unified", legend_title_text="Contract")
    fig2.show()

## 6. Volume & open interest

In [None]:
em = event_mkt()

if em["snapshot_ts"].nunique() >= 2:
    fig = make_subplots(
        rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.08,
        subplot_titles=("Cumulative volume", "Open interest"),
    )
    for sub in em["subtitle"].unique():
        s = em[em["subtitle"] == sub].sort_values("snapshot_ts")
        fig.add_trace(go.Scatter(
            x=s["snapshot_ts"], y=s["volume"], name=sub, mode="lines+markers",
            legendgroup=sub,
        ), row=1, col=1)
        fig.add_trace(go.Scatter(
            x=s["snapshot_ts"], y=s["open_interest"], name=sub, mode="lines+markers",
            legendgroup=sub, showlegend=False,
        ), row=2, col=1)
    fig.update_layout(
        height=600, title_text=f"Volume & OI — {selected_event()}",
        hovermode="x unified",
    )
    fig.show()
else:
    latest = em.sort_values("snapshot_ts").groupby("market_ticker").last().reset_index()
    fig = px.bar(
        latest.sort_values("volume", ascending=True),
        y="subtitle", x="volume", orientation="h",
        color="subtitle",
        title=f"Volume by contract — {selected_event()}",
        labels={"volume": "Volume (contracts)", "subtitle": ""},
    )
    fig.update_layout(showlegend=False)
    fig.show()

    fig2 = px.bar(
        latest.sort_values("open_interest", ascending=True),
        y="subtitle", x="open_interest", orientation="h",
        color="subtitle",
        title=f"Open interest by contract — {selected_event()}",
        labels={"open_interest": "Open interest", "subtitle": ""},
    )
    fig2.update_layout(showlegend=False)
    fig2.show()

## 7. Orderbook depth (latest snapshot)

In [None]:
eob = event_ob()

if eob.empty:
    display(Markdown("> No orderbook data for this event."))
else:
    # Use the latest snapshot timestamp per contract
    latest_ts = eob.groupby("market_ticker")["snapshot_ts"].max().reset_index()
    eob_latest = eob.merge(latest_ts, on=["market_ticker", "snapshot_ts"])

    contracts = sorted(eob_latest["market_ticker"].unique())
    n_contracts = len(contracts)
    cols = min(n_contracts, 3)
    rows = (n_contracts + cols - 1) // cols

    # Map market_ticker -> subtitle for readability
    if not mkt.empty:
        sub_map = mkt.drop_duplicates("market_ticker").set_index("market_ticker")["subtitle"].to_dict()
    else:
        sub_map = {}

    fig = make_subplots(
        rows=rows, cols=cols,
        subplot_titles=[sub_map.get(c, c) for c in contracts],
        horizontal_spacing=0.08, vertical_spacing=0.12,
    )

    for idx, contract in enumerate(contracts):
        r, c = divmod(idx, cols)
        cdf = eob_latest[eob_latest["market_ticker"] == contract]

        for side, color in [("yes", "#2ca02c"), ("no", "#d62728")]:
            sdf = cdf[cdf["side"] == side].sort_values("price_cents")
            if sdf.empty:
                continue
            fig.add_trace(go.Bar(
                x=sdf["price_cents"], y=sdf["quantity"],
                name=f"{side}", marker_color=color,
                legendgroup=side, showlegend=(idx == 0),
            ), row=r+1, col=c+1)

    fig.update_layout(
        height=300 * rows, barmode="group",
        title_text=f"Orderbook depth — {selected_event()} (latest snapshot)",
    )
    fig.update_xaxes(title_text="Price (¢)")
    fig.update_yaxes(title_text="Quantity")
    fig.show()

## 8. Cross-event comparison

Compare the latest implied probability distribution across **all** events.  
Each event is a separate facet; the x-axis is the temperature bucket.

In [None]:
if mkt.empty:
    display(Markdown("> No market data loaded."))
else:
    # Latest snapshot per event × contract
    latest_all = (
        mkt.sort_values("snapshot_ts")
        .groupby(["event_ticker", "market_ticker"])
        .last()
        .reset_index()
    )

    # Sort subtitles by the numeric lower bound for a natural temperature axis
    def _sort_key(subtitle: str) -> float:
        """Extract a numeric sort key from subtitle like '42° to 43°' or '44° or above'."""
        nums = re.findall(r"(\d+(?:\.\d+)?)", subtitle)
        if nums:
            return float(nums[0])
        return 0.0

    latest_all["_sort"] = latest_all["subtitle"].map(_sort_key)
    latest_all = latest_all.sort_values(["event_label", "_sort"])

    n_events = latest_all["event_ticker"].nunique()

    fig = px.bar(
        latest_all,
        x="subtitle", y="mid_price",
        color="subtitle",
        facet_col="event_label",
        facet_col_wrap=min(n_events, 3),
        title="Implied probability distribution — all events",
        labels={"mid_price": "Mid price (¢ ≈ %)", "subtitle": "Temp range"},
        category_orders={"subtitle": latest_all["subtitle"].unique().tolist()},
    )
    fig.update_layout(
        showlegend=False,
        height=400 * ((n_events + 2) // 3),
        yaxis_range=[0, 100],
    )
    fig.for_each_annotation(lambda a: a.update(text=a.text.split("=")[-1]))
    fig.show()

## 9. Snapshot cadence & data health

In [None]:
if mkt.empty:
    display(Markdown("> No data."))
else:
    snap_times = (
        mkt.drop_duplicates(subset=["snapshot_ts", "event_ticker"])
        .sort_values("snapshot_ts")
    )

    for evt in snap_times["event_ticker"].unique():
        s = snap_times[snap_times["event_ticker"] == evt]["snapshot_ts"].sort_values()
        deltas = s.diff().dropna().dt.total_seconds()
        info = parse_event_ticker(evt)
        display(Markdown(f"### {evt} — {info['event_label']}"))
        display(Markdown(
            f"- **Snapshots:** {len(s)}\n"
            f"- **Time span:** {s.min()} → {s.max()}\n"
            f"- **Interval — mean:** {deltas.mean():.1f}s, "
            f"**median:** {deltas.median():.1f}s, "
            f"**min:** {deltas.min():.1f}s, "
            f"**max:** {deltas.max():.1f}s"
        ))

    # Timeline dot plot — one dot per snapshot
    fig = px.strip(
        snap_times,
        x="snapshot_ts", y="event_label",
        color="event_label",
        title="Snapshot timeline",
        labels={"snapshot_ts": "UTC", "event_label": ""},
    )
    fig.update_traces(marker_size=5)
    fig.update_layout(showlegend=False, height=200 + 60 * len(event_tickers))
    fig.show()

## 10. Historical candlesticks & trades (if backfilled)

In [None]:
if candle_df.empty and trade_df.empty:
    display(Markdown(
        "> No historical data yet.  Run the backfill script to populate:\n"
        "> ```bash\n"
        "> pred_env/bin/python pred_market_src/collector/backfill.py --start 2026-02-01\n"
        "> ```"
    ))
else:
    if not candle_df.empty:
        display(Markdown("### Candlestick data"))
        display(Markdown(f"Rows: {len(candle_df):,}  |  Events: {candle_df['event_ticker'].nunique()}"))

        # OHLC chart per event
        for evt in candle_df["event_ticker"].unique():
            edf = candle_df[candle_df["event_ticker"] == evt]
            for tk in edf["market_ticker"].unique():
                tdf = edf[edf["market_ticker"] == tk].sort_values("timestamp")
                fig = go.Figure(go.Candlestick(
                    x=tdf["timestamp"],
                    open=tdf["open_price"], high=tdf["high_price"],
                    low=tdf["low_price"], close=tdf["close_price"],
                ))
                fig.update_layout(title=f"{tk}", xaxis_title="Time", yaxis_title="Price")
                fig.show()

    if not trade_df.empty:
        display(Markdown("### Trade data"))
        display(Markdown(f"Rows: {len(trade_df):,}  |  Events: {trade_df['event_ticker'].nunique()}"))
        display(trade_df.head(20))

## 11. Raw data explorer

Quick peek at the raw dataframes for debugging.

In [None]:
display(Markdown("### Market snapshots (first 20 rows)"))
display(mkt.head(20))

display(Markdown("### Orderbook snapshots (first 20 rows)"))
display(ob_df.head(20))