# 28 – Trade Execution & Reconciliation (updated)

This notebook version computes `slippage_dollars` primarily as **signal-to-fill** slippage using your master trades file matched by `plan_id`.


In [None]:
#!/usr/bin/env python3
"""
28 – Trade Execution & Reconciliation (Robust, Plan-Aware, Append-Only)

Fix: Defensive ingestion for broker_fills_manual.csv.
- Normalizes column names (case/spacing) and maps common broker aliases
- Ensures REQUIRED_FILL_COLS exist (creates safe defaults where possible)
- Validates hard-required execution truth fields
- Keeps executed_trades.csv append-only + idempotent via broker_order_id

CHANGE (signal-to-fill slippage):
- slippage_dollars is now computed vs *signal price* (from your master trades file) when available:
    BUY  : (fill_price - signal_price) * shares
    SELL : (signal_price - fill_price) * shares
  (positive = cost/worse, negative = improvement/better)
- Falls back to order_price-based slippage only if signal price is missing.
"""

# ============================================================
# Imports
# ============================================================

import os
import re
import pandas as pd
import numpy as np
from datetime import datetime
from pandas.errors import EmptyDataError

# ============================================================
# Configuration
# ============================================================

LIVE_ROOT = "./27a-2G_live_trading"

LIVE_PORTFOLIO_FILE   = f"{LIVE_ROOT}/live_portfolio.csv"          # AUTHORITATIVE STATE
MANUAL_FILLS_FILE     = f"{LIVE_ROOT}/broker_fills_manual.csv"     # HUMAN / BROKER INPUT
EXECUTED_TRADES_FILE  = f"{LIVE_ROOT}/executed_trades.csv"         # IMMUTABLE LEDGER
RECON_LOG_FILE        = f"{LIVE_ROOT}/reconciliation_log.csv"      # DIAGNOSTICS ONLY

# ---- Master trades file (for signal price by plan_id) ----
MASTER_TRADES_FILE     = "./27a-2G_live_trading/master_trades.csv"
MASTER_PLAN_ID_COL     = "plan_id"
MASTER_SIGNAL_PX_COL = "est_exec_px" # <-- change if your master file uses a different name

os.makedirs(LIVE_ROOT, exist_ok=True)
INITIAL_CASH = 342000   # sensible default; change as needed

# ============================================================
# Required Fill Columns (Minimum Execution Truth)
# ============================================================

REQUIRED_FILL_COLS = {
    "plan_id",
    "signal_date",
    "exec_date",
    "ticker",
    "side",
    "shares_filled",
    "order_type",
    "order_price",
    "fill_price",
    "broker_fee",
    "broker_order_id",
}

# These are truly non-negotiable for correct reconciliation
HARD_REQUIRED = {"ticker", "side", "shares_filled", "fill_price", "broker_order_id"}

# ============================================================
# Helpers: Column normalization + alias mapping
# ============================================================

def _snake(s: str) -> str:
    """Normalize headers to snake_case-ish."""
    s = str(s).strip().lower()
    s = re.sub(r"[^\w]+", "_", s)          # spaces, hyphens, etc -> underscore
    s = re.sub(r"_+", "_", s).strip("_")
    return s

# Canonical column -> common aliases you might see in broker exports / manual entry
COL_ALIASES = {
    "plan_id":        ["plan", "strategy_id", "signal_id", "trade_plan_id", "plan"],
    "signal_date":    ["signal_dt", "signal_time", "signal_timestamp", "signal_datetime", "signaldate"],
    "exec_date":      ["execution_date", "execution_dt", "exec_dt", "fill_date", "filled_at", "timestamp", "time", "datetime"],
    "ticker":         ["symbol", "underlying", "asset", "security", "instrument"],
    "side":           ["buy_sell", "b_s", "direction", "action", "bs"],
    "shares_filled":  ["qty", "quantity", "filled_qty", "filled_quantity", "shares", "filled_shares", "size", "units"],
    "order_type":     ["type", "ord_type", "orderstyle", "order_style"],
    "order_price":    ["limit_price", "limit", "lmt_price", "order_limit_price", "ref_price", "requested_price"],
    "fill_price":     ["price", "avg_price", "average_price", "execution_price", "fill", "fill_px", "fillprice"],
    "broker_fee":     ["commission", "commissions", "fees", "fee", "broker_commission"],
    "broker_order_id":["order_id", "id", "brokerid", "broker_orderid", "orderid", "trade_id", "execution_id"],
}

def normalize_and_map_columns(df: pd.DataFrame) -> tuple[pd.DataFrame, dict]:
    """
    Returns (df, mapping_info) where mapping_info shows what got renamed/created.
    """
    mapping_info = {"renamed": {}, "created": []}

    # Normalize existing columns
    original_cols = list(df.columns)
    normalized_cols = [_snake(c) for c in original_cols]
    df.columns = normalized_cols

    # Build reverse lookup: alias -> canonical
    alias_to_canon = {}
    for canon, aliases in COL_ALIASES.items():
        alias_to_canon[_snake(canon)] = canon  # allow exact canonical name
        for a in aliases:
            alias_to_canon[_snake(a)] = canon

    # Rename any columns that match an alias
    rename_map = {}
    for c in df.columns:
        if c in alias_to_canon:
            canon = alias_to_canon[c]
            if c != canon and canon not in df.columns:
                rename_map[c] = canon

    if rename_map:
        df = df.rename(columns=rename_map)
        mapping_info["renamed"] = dict(rename_map)

    # Ensure all REQUIRED_FILL_COLS exist (create safe defaults if missing)
    defaults = {
        "plan_id": "",
        "signal_date": pd.NaT,
        "exec_date": pd.NaT,
        "order_type": "MKT",
        "order_price": np.nan,
        "broker_fee": 0.0,
    }
    for col in REQUIRED_FILL_COLS:
        if col not in df.columns:
            df[col] = defaults.get(col, np.nan)
            mapping_info["created"].append(col)

    return df, mapping_info

def normalize_side(x: str) -> str:
    s = str(x).strip().upper()
    if s in {"B", "BUY", "BOT", "LONG"}:
        return "BUY"
    if s in {"S", "SELL", "SLD", "SHORT"}:
        return "SELL"
    return s

def _clean_plan_id(x) -> str:
    """Plan id as a stable string key (empty string if missing)."""
    if x is None:
        return ""
    s = str(x).strip()
    if s.lower() in {"nan", "none"}:
        return ""
    return s

# ============================================================
# Signal price map (plan_id -> signal_price)
# ============================================================

def load_signal_price_map(path: str) -> dict:
    """
    Loads a master trades file and returns: {plan_id(str) : signal_price(float)}.

    If plan_id duplicates exist, last row (in file order) wins.
    """
    if not os.path.exists(path):
        return {}

    df = pd.read_parquet(path) if path.lower().endswith(".parquet") else pd.read_csv(path)

    if MASTER_PLAN_ID_COL not in df.columns or MASTER_SIGNAL_PX_COL not in df.columns:
        return {}

    df[MASTER_PLAN_ID_COL] = df[MASTER_PLAN_ID_COL].apply(_clean_plan_id)
    df[MASTER_SIGNAL_PX_COL] = pd.to_numeric(df[MASTER_SIGNAL_PX_COL], errors="coerce")

    # last one wins if duplicates
    return (
        df.dropna(subset=[MASTER_PLAN_ID_COL, MASTER_SIGNAL_PX_COL])
          .groupby(MASTER_PLAN_ID_COL)[MASTER_SIGNAL_PX_COL]
          .last()
          .to_dict()
    )

signal_px_map = load_signal_price_map(MASTER_TRADES_FILE)
if not signal_px_map:
    print("WARNING: No signal price map loaded (missing file or columns). Slippage will fall back to order_price where available.")

# ============================================================
# Load Manual Fills (Defensive Ingestion)
# ============================================================

if not os.path.exists(MANUAL_FILLS_FILE):
    raise FileNotFoundError(f"Missing broker fills file: {MANUAL_FILLS_FILE}")

fills_raw = pd.read_csv(MANUAL_FILLS_FILE)
fills, ingest_info = normalize_and_map_columns(fills_raw)

# Optional: show what happened (helps you debug broker export changes)
if ingest_info["renamed"] or ingest_info["created"]:
    print("Ingestion mapping:")
    if ingest_info["renamed"]:
        print("  Renamed columns:", ingest_info["renamed"])
    if ingest_info["created"]:
        print("  Created missing required cols w/ defaults:", ingest_info["created"])

# Coerce dates
fills["signal_date"] = pd.to_datetime(fills["signal_date"], errors="coerce")
fills["exec_date"] = pd.to_datetime(fills["exec_date"], errors="coerce")

# Normalize fields / types
fills["plan_id"] = fills["plan_id"].apply(_clean_plan_id)
fills["ticker"] = fills["ticker"].astype(str).str.strip().str.upper()
fills["side"] = fills["side"].apply(normalize_side)
fills["shares_filled"] = pd.to_numeric(fills["shares_filled"], errors="coerce").fillna(0).astype(int)

fills["order_type"] = fills["order_type"].astype(str).str.strip().str.upper()
fills["order_price"] = pd.to_numeric(fills["order_price"], errors="coerce")
fills["fill_price"] = pd.to_numeric(fills["fill_price"], errors="coerce")

fills["broker_fee"] = pd.to_numeric(fills["broker_fee"], errors="coerce").fillna(0.0)
fills["broker_order_id"] = fills["broker_order_id"].astype(str).str.strip()

# Validate hard-required truth fields
missing_hard = [c for c in HARD_REQUIRED if c not in fills.columns]
if missing_hard:
    raise ValueError(f"Still missing hard-required columns after mapping: {sorted(missing_hard)}")

invalid = (
    (fills["ticker"] == "") |
    (~fills["side"].isin(["BUY", "SELL"])) |
    (fills["shares_filled"] <= 0) |
    (fills["fill_price"].isna()) | (fills["fill_price"] <= 0) |
    (fills["broker_order_id"] == "")
)
fills = fills[~invalid].copy()

# Stable sort for deterministic replay; fill NaT exec_date so sort is stable
fills["_exec_sort"] = fills["exec_date"].fillna(pd.Timestamp("1970-01-01"))
fills = fills.sort_values(["_exec_sort", "ticker", "side", "broker_order_id"]).drop(columns=["_exec_sort"]).reset_index(drop=True)

# ============================================================
# Load Live Portfolio (Authoritative Input) — robust bootstrap
# ============================================================

def init_live_portfolio(path: str, cash: float) -> pd.DataFrame:
    asof_ts = datetime.now().isoformat(timespec="seconds")
    df = pd.DataFrame([{
        "asof_ts": asof_ts,
        "cash": float(cash),
        "ticker": "",
        "shares": 0,
    }])
    df.to_csv(path, index=False)
    return df

if not os.path.exists(LIVE_PORTFOLIO_FILE) or os.path.getsize(LIVE_PORTFOLIO_FILE) == 0:
    portfolio = init_live_portfolio(LIVE_PORTFOLIO_FILE, INITIAL_CASH)
else:
    try:
        portfolio = pd.read_csv(LIVE_PORTFOLIO_FILE)
    except EmptyDataError:
        portfolio = init_live_portfolio(LIVE_PORTFOLIO_FILE, INITIAL_CASH)

if portfolio.empty:
    portfolio = init_live_portfolio(LIVE_PORTFOLIO_FILE, INITIAL_CASH)

needed_port_cols = {"cash", "ticker", "shares"}
missing_port = needed_port_cols - set(portfolio.columns)
if missing_port:
    raise ValueError(f"live_portfolio.csv missing required columns: {sorted(missing_port)}")

cash = float(portfolio.iloc[0]["cash"])

positions = {}
for _, r in portfolio.iterrows():
    t = str(r.get("ticker", "")).strip().upper()
    sh = int(r.get("shares", 0) or 0)
    if t and sh > 0:
        positions[t] = sh

# ============================================================
# Idempotency: Skip Already Processed Fills
# ============================================================

already = set()
if os.path.exists(EXECUTED_TRADES_FILE):
    prior_exec = pd.read_csv(EXECUTED_TRADES_FILE)
    if "broker_order_id" in prior_exec.columns:
        already = set(prior_exec["broker_order_id"].astype(str))

fills = fills[~fills["broker_order_id"].isin(already)].copy()

# ============================================================
# Apply Fills (Core Reconciliation)
# ============================================================

exec_rows = []
log_rows = []

for _, f in fills.iterrows():
    tkr = f["ticker"]
    side = f["side"]
    sh = int(f["shares_filled"])
    fill_px = float(f["fill_price"])
    fee = float(f["broker_fee"])

    order_px = f.get("order_price", np.nan)
    order_px = float(order_px) if pd.notna(order_px) else np.nan

    plan_id = _clean_plan_id(f.get("plan_id", ""))

    exec_date = (
        f["exec_date"].date().isoformat()
        if pd.notna(f.get("exec_date"))
        else datetime.now().date().isoformat()
    )

    signal_date = (
        f["signal_date"].date().isoformat()
        if pd.notna(f.get("signal_date"))
        else np.nan
    )

    gross = sh * fill_px
    cash_before = cash
    pos_before = positions.get(tkr, 0)

    # --------------------------------------------------------
    # Slippage (primary): vs SIGNAL price (system expectation)
    # Fallback: vs order reference price (legacy)
    # --------------------------------------------------------
    signal_px = signal_px_map.get(plan_id, np.nan)
    signal_px = float(signal_px) if pd.notna(signal_px) else np.nan

    slippage = np.nan
    slippage_ref = "NA"
    if pd.notna(signal_px):
        slippage_ref = "SIGNAL"
        if side == "BUY":
            slippage = (fill_px - signal_px) * sh
        elif side == "SELL":
            slippage = (signal_px - fill_px) * sh
    elif pd.notna(order_px):
        slippage_ref = "ORDER"
        if side == "BUY":
            slippage = (fill_px - order_px) * sh
        elif side == "SELL":
            slippage = (order_px - fill_px) * sh

    if side == "BUY":
        cash -= (gross + fee)
        positions[tkr] = pos_before + sh

    elif side == "SELL":
        cash += (gross - fee)
        new_sh = pos_before - sh
        if new_sh < 0:
            raise RuntimeError(f"Oversell detected for {tkr}: have {pos_before}, tried to sell {sh}")
        if new_sh == 0:
            positions.pop(tkr, None)
        else:
            positions[tkr] = new_sh
    else:
        raise ValueError(f"Invalid side: {side}")

    exec_rows.append({
        "exec_ts": datetime.now().isoformat(timespec="seconds"),
        "signal_date": signal_date,
        "exec_date": exec_date,
        "plan_id": plan_id,
        "ticker": tkr,
        "side": side,
        "shares": sh,
        "order_type": f.get("order_type", "MKT"),
        "signal_price": signal_px,           # NEW
        "order_price": order_px,
        "fill_price": fill_px,
        "gross_notional": gross,
        "broker_fee": fee,
        "slippage_ref": slippage_ref,        # NEW: SIGNAL / ORDER / NA
        "slippage_dollars": slippage,        # NOW: signal-to-fill when available
        "net_cash_impact": (-(gross + fee) if side == "BUY" else (gross - fee)),
        "broker_order_id": f["broker_order_id"],
        "cash_before": cash_before,
        "cash_after": cash,
        "pos_before": pos_before,
        "pos_after": positions.get(tkr, 0),
    })

    log_rows.append(exec_rows[-1])

# ============================================================
# Save Executed Trades (Append-Only)
# ============================================================

exec_df = pd.DataFrame(exec_rows)
if os.path.exists(EXECUTED_TRADES_FILE):
    prior = pd.read_csv(EXECUTED_TRADES_FILE)
    exec_df = pd.concat([prior, exec_df], ignore_index=True)

exec_df.to_csv(EXECUTED_TRADES_FILE, index=False)

# ============================================================
# Save Updated Live Portfolio
# ============================================================

asof_ts = datetime.now().isoformat(timespec="seconds")
port_rows = []

if positions:
    for t, sh in sorted(positions.items()):
        port_rows.append({"asof_ts": asof_ts, "cash": cash, "ticker": t, "shares": sh})
else:
    port_rows.append({"asof_ts": asof_ts, "cash": cash, "ticker": "", "shares": 0})

pd.DataFrame(port_rows).to_csv(LIVE_PORTFOLIO_FILE, index=False)

# ============================================================
# Save Reconciliation Log
# ============================================================

pd.DataFrame(log_rows).to_csv(RECON_LOG_FILE, index=False)

print("Reconciliation complete")
print(f"New fills applied: {len(exec_rows)}")
print(f"Final cash: {cash:,.2f}")
print(f"Open positions: {len(positions)}")

if str(MASTER_TRADES_FILE).lower().endswith(".parquet"):
    df = pd.read_parquet(MASTER_TRADES_FILE)
else:
    df = pd.read_csv(MASTER_TRADES_FILE)

print("MASTER COLUMNS:", df.columns.tolist())
print("Has plan_id?", "plan_id" in df.columns)
print("Has signal px col?", MASTER_SIGNAL_PX_COL in df.columns)




Reconciliation complete
New fills applied: 9
Final cash: 192,402.51
Open positions: 5
MASTER COLUMNS: ['plan_id', 'created_ts', 'signal_date', 'exec_date', 'ticker', 'side', 'shares', 'est_exec_px', 'est_value', 'reason', 'slope_rank', 'spy_above_200dma', 'cash_before', 'cash_after', 'portfolio_after', 'num_positions_after']
Has plan_id? True
Has signal px col? True
