In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import pandas as pd
from ib_async import IB, util

In [None]:
util.startLoop()

host = "127.0.0.1"
port = 7496
clientId = 101

TWS_CONNECTION = IB().connect(host, port, clientId, timeout=30)

## Data Source Options

**Option A: Live TWS Connection** - Run the cells below to connect to TWS and fetch trades. This also saves a snapshot for offline testing.

**Option B: Load from Snapshot** - Skip the TWS connection cells and run the "Load from snapshot" cell instead.

In [None]:
import pickle
from pathlib import Path
from datetime import datetime

SNAPSHOTS_DIR = Path("../data/snapshots")


def save_trades_snapshot(trades, snapshots_dir=SNAPSHOTS_DIR):
    """Persist raw ib_async Trade objects to disk with timestamp."""
    snapshots_dir.mkdir(parents=True, exist_ok=True)
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    path = snapshots_dir / f"tws_trades_{timestamp}.pkl"
    with open(path, "wb") as f:
        pickle.dump(trades, f)
    print(f"Saved {len(trades)} trades to {path}")
    return path


def load_trades_snapshot(path=None, snapshots_dir=SNAPSHOTS_DIR):
    """Load raw ib_async Trade objects from disk. If no path given, loads most recent."""
    if path is None:
        snapshots = sorted(snapshots_dir.glob("tws_trades_*.pkl"))
        if not snapshots:
            raise FileNotFoundError(f"No snapshots found in {snapshots_dir}")
        path = snapshots[-1]  # Most recent
        print(f"Loading most recent snapshot: {path.name}")
    with open(path, "rb") as f:
        trades = pickle.load(f)
    print(f"Loaded {len(trades)} trades from {path}")
    return trades


def list_snapshots(snapshots_dir=SNAPSHOTS_DIR):
    """List available snapshots."""
    snapshots = sorted(snapshots_dir.glob("tws_trades_*.pkl"))
    for s in snapshots:
        print(s.name)
    return snapshots

In [None]:
from ib_async import IB, Contract, Option
from ib_async.ticker import Ticker
from typing import List
from enum import Enum
from abc import ABC, abstractmethod


class MarketDataType(Enum):
    """
    Market Data Type enum for IBKR TWS
    https://interactivebrokers.github.io/tws-api/market_data_type.html
    """

    LIVE = 1
    FROZEN = 2
    DELAYED = 3
    DELAYED_FROZEN = 4


class RealTimeSource(ABC):
    pass


class IbkrTws(RealTimeSource):
    def __init__(self, ib=None):
        self.ib = ib
        self.ib.reqMarketDataType(MarketDataType.LIVE.value)

    def get_accounts(self):
        return self.ib.managedAccounts()

    def get_positions_for_account(self, account_id: str):
        return self.ib.positions(account=account_id)

    def get_quotes(self, con_ids: List[str]):
        pass

    def get_trades(self):
        return self.ib.trades()

    def get_executions(self):
        return self.ib.executions()


ibkr_realtime = IbkrTws(ib=TWS_CONNECTION)
trades_ibasync_objects = ibkr_realtime.get_trades()

# Save snapshot for offline testing
save_trades_snapshot(trades_ibasync_objects)

In [None]:
# Load from snapshot when TWS is not connected
# Uncomment and run this cell instead of the TWS connection cells above

# list_snapshots()  # See available snapshots
# trades_ibasync_objects = load_trades_snapshot()  # Loads most recent
# trades_ibasync_objects = load_trades_snapshot(SNAPSHOTS_DIR / "tws_trades_20250115_093000.pkl")  # Load specific

In [None]:
from ib_async import util

real_time_trades_df = util.df(trades_ibasync_objects)

In [None]:
# real_time_trades_df

In [None]:
from ngv_reports_ibkr.dtype_exporter import export_dtypes_all

# export_dtypes_all(real_time_trades_df, '../data', prefix='ibkr_tws_realtime_trades', only_export=['md'])

In [None]:
from ngv_reports_ibkr.dtype_exporter import export_dtypes_all
from ngv_reports_ibkr.expand_contract_columns import expand_all_trade_columns, expand_fills_and_logs
from ngv_reports_ibkr.transforms import Transforms

expanded_df = expand_all_trade_columns(real_time_trades_df)
expanded_df = expand_fills_and_logs(expanded_df, fills_col="fills", log_col="log")

# Filter to only actual executions (removes log-only rows)
trades_df = Transforms.filter_to_executions(expanded_df)

export_dtypes_all(trades_df, "../data", prefix="ibkr_tws_realtime_trades_interpreted", only_export=["md"])

## Reconciliation Fields

View key fields that should match between TWS realtime and Flex report:

| TWS Field | Flex Field | Notes |
|-----------|------------|-------|
| `fill_execution_id` | `ibExecID` | ✅ JOIN KEY - Use this for reconciliation |
| `conId` | `conid` | Contract identifier |
| `fill_shares` | `quantity` | Quantity should match |
| `fill_price` | `tradePrice` | Price should match |
| `permId` | `ibOrderID` | ⚠️ Different ID systems - DO NOT JOIN |

In [None]:
# View key fields for reconciliation with Flex report
# These are the fields that should match between TWS and Flex

reconciliation_cols = [
    "permId",  # TWS order ID -> matches Flex ibOrderID
    "conId",  # Contract ID -> matches Flex conid
    "symbol",
    "secType",
    "strike",
    "right",
    "lastTradeDateOrContractMonth",
    "action",  # BUY/SELL
    "fill_shares",  # -> matches Flex quantity
    "fill_price",  # -> matches Flex tradePrice
    "fill_execution_id",  # -> matches Flex ibExecID
    "fill_execution_time",
    "fill_commission",
]

# Show all trades with reconciliation fields
trades_df[reconciliation_cols]

In [None]:
from ngv_reports_ibkr.schemas.ibkr_tws_trades import ibkr_tws_trades_schema

xdf = ibkr_tws_trades_schema.validate(trades_df)

## Monday: Download Flex Report Snapshot

Run the cell below to download the Flex report and save a timestamped CSV to `data/snapshots/`.

In [None]:
from ngv_reports_ibkr.download_trades import fetch_report
from ngv_reports_ibkr.config_helpers import get_config, get_ib_json

configs = get_config("../.env")
data = get_ib_json(configs)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

for account in data.get("accounts", []):
    query_id = int(account.get("annual", 0))
    if query_id <= 0:
        print(f"Skipping {account['name']}: no annual query_id")
        continue

    report = fetch_report(account["flex_token"], query_id, cache_report_on_disk=True)

    for aid in report.account_ids():
        flex_trades_df = report.trades_by_account_id(aid)
        if flex_trades_df is not None:
            csv_path = SNAPSHOTS_DIR / f"flex_trades_{aid}_{timestamp}.csv"
            flex_trades_df.to_csv(csv_path)
            print(f"Saved: {csv_path.name} ({len(flex_trades_df)} trades)")

## Reconcile TWS vs Flex

Load the most recent Flex snapshot and compare against the TWS trades by joining on **execution ID** (`fill_execution_id` = `ibExecID`).

In [None]:
snapshots = list_snapshots()
the_one = snapshots[0]

trades_ibasync_objects = load_trades_snapshot(the_one)

snapshot_real_time_trades_df = util.df(trades_ibasync_objects)

expanded_df = expand_all_trade_columns(snapshot_real_time_trades_df)
expanded_df = expand_fills_and_logs(expanded_df, fills_col="fills", log_col="log")

# Filter to only actual executions (removes log-only rows)
trades_df = Transforms.filter_to_executions(expanded_df)
trades_df.T

pd.set_option("display.max_rows", None)

trades_df.T

In [None]:
# Find the most recent Flex snapshot for your account
account_id = "U123"  # Update with your account ID

flex_csvs = sorted(SNAPSHOTS_DIR.glob(f"flex_trades_{account_id}_*.csv"))
if not flex_csvs:
    print(f"No Flex snapshots found for {account_id}. Run the download cell above first.")
else:
    flex_csv_path = flex_csvs[-1]
    print(f"Using Flex snapshot: {flex_csv_path.name}")

    flex_df = pd.read_csv(flex_csv_path, low_memory=False)

    # Filter TWS trades to the selected account
    tws_account_trades = trades_df[trades_df["account"] == account_id].copy()

    # Get execution IDs from TWS trades (this is the correct join key)
    tws_exec_ids = tws_account_trades["fill_execution_id"].tolist()
    print(f"\nTWS execution IDs to look up: {len(tws_exec_ids)} executions")

    # Find matching trades in Flex report by execution ID
    flex_matches = flex_df[flex_df["ibExecID"].isin(tws_exec_ids)]
    print(f"Flex matches found: {len(flex_matches)}")

    if len(flex_matches) > 0:
        # Side-by-side comparison
        tws_subset = tws_account_trades[["fill_execution_id", "permId", "conId", "symbol", "action", "fill_shares", "fill_price"]].copy()
        tws_subset.columns = ["exec_id", "tws_permId", "tws_conId", "tws_symbol", "tws_side", "tws_qty", "tws_price"]

        flex_subset = flex_matches[["ibExecID", "ibOrderID", "conid", "symbol", "buySell", "quantity", "tradePrice"]].copy()
        flex_subset.columns = ["exec_id", "flex_ibOrderID", "flex_conid", "flex_symbol", "flex_side", "flex_qty", "flex_price"]

        # Join on execution ID (the correct join key)
        comparison = pd.merge(tws_subset, flex_subset, on="exec_id", how="outer", indicator=True)

        matched_count = (comparison["_merge"] == "both").sum()
        tws_only_count = (comparison["_merge"] == "left_only").sum()
        flex_only_count = (comparison["_merge"] == "right_only").sum()

        print(f"\n✅ Matched: {matched_count}")
        print(f"⚠️  TWS only: {tws_only_count}")
        print(f"⚠️  Flex only: {flex_only_count}")

        # Validate matched fields
        matched = comparison[comparison["_merge"] == "both"]
        if len(matched) > 0:
            conid_match = (matched["tws_conId"] == matched["flex_conid"]).all()
            qty_match = (matched["tws_qty"] == matched["flex_qty"]).all()
            price_match = ((matched["tws_price"] - matched["flex_price"]).abs() < 0.0001).all()

            print(f"\nValidation:")
            print(f"  Contract ID match: {'✅' if conid_match else '❌'}")
            print(f"  Quantity match: {'✅' if qty_match else '❌'}")
            print(f"  Price match: {'✅' if price_match else '❌'}")

            # Note: permId != ibOrderID (different ID systems)
            print(f"\nNote: permId and ibOrderID use different ID systems and won't match")

        display(comparison)
    else:
        print("\n⚠️  No matches found. The TWS trades may not be in this Flex report yet (1-day delay).")