In [2]:
import pandas as pd
from datetime import datetime, date
class TradeLedger:
    def __init__(self):
        cols = [
            "timestamp","action","instrument_type","underlying",
            "option_type","strike","expiry","quantity","price",
            "signed_quantity","total_cost"
        ]
        self.trades_df = pd.DataFrame(columns=cols)

    def record_trades(self, new_trades: pd.DataFrame):
        """Batch append a DataFrame of trades."""
        print(f"new_trades: {new_trades}")
        self.trades_df = pd.concat([self.trades_df, new_trades], ignore_index=True)
        print(f"self.trades_df: {self.trades_df}")

    def record_trade(self,
                     timestamp: datetime,
                     action: str,
                     instrument_type: str,
                     underlying: str,
                     option_type: str,
                     strike: float,
                     expiry: date,
                     quantity: float,
                     price: float):
        """
        Add a single trade.
        Automatically computes signed_quantity and total_cost.
        """
        signed_qty = quantity if action == "buy" else -quantity
        row = {
            "timestamp": timestamp,
            "action": action,
            "instrument_type": instrument_type,
            "underlying": underlying,
            "option_type": option_type,
            "strike": strike,
            "expiry": expiry,
            "quantity": quantity,
            "price": price,
            "signed_quantity": signed_qty,
            "total_cost": signed_qty * price
        }
        # create 1-row DataFrame and concat
        self.trades_df = pd.concat(
            [self.trades_df, pd.DataFrame([row])],
            ignore_index=True
        )

    @property
    def trades(self):
        """All trades, sorted by timestamp."""
        return self.trades_df.sort_values("timestamp").reset_index(drop=True)

In [9]:
from __future__ import annotations
import numpy as np
import pandas as pd
from dataclasses import dataclass
from typing import Callable
from datetime import datetime, date, time
from zoneinfo import ZoneInfo

TZ_NY  = ZoneInfo("America/New_York")
TZ_UTC = ZoneInfo("UTC")

# ---------- Utilities ----------
def to_utc(ts, assume_tz=TZ_NY):
    """Return tz-aware UTC pandas Timestamps/Series/Index."""
    if isinstance(ts, (pd.Series, pd.Index)):
        out = pd.to_datetime(ts, errors="coerce")
        if out.dt.tz is None:
            out = out.dt.tz_localize(assume_tz).dt.tz_convert(TZ_UTC)
        else:
            out = out.dt.tz_convert(TZ_UTC)
        return out
    # scalar
    t = pd.Timestamp(ts)
    if t.tzinfo is None:
        t = t.tz_localize(assume_tz).tz_convert(TZ_UTC)
    else:
        t = t.tz_convert(TZ_UTC)
    return t



def to_nyc(ts, assume_tz=TZ_UTC):
    """
    Return tz-aware America/New_York pandas Timestamps/Series/Index.
    If input is naive, assume it is in `assume_tz` (default: UTC) first.
    """
    if isinstance(ts, (pd.Series, pd.Index)):
        out = pd.to_datetime(ts, errors="coerce")
        if out.dt.tz is None:
            out = out.dt.tz_localize(assume_tz).dt.tz_convert(TZ_NY)
        else:
            out = out.dt.tz_convert(TZ_NY)
        return out

    t = pd.Timestamp(ts)
    if t.tzinfo is None:
        t = t.tz_localize(assume_tz).tz_convert(TZ_NY)
    else:
        t = t.tz_convert(TZ_NY)
    return t

def session_close_utc(d: date, close=time(16, 30)) -> pd.Timestamp:
    """NY close for a given *calendar* date, expressed in UTC."""
    return pd.Timestamp(datetime.combine(d, close, tzinfo=TZ_NY)).tz_convert(TZ_UTC)

def to_date_only(x):
    """Return Python date(s) from datetime-like."""
    if isinstance(x, (pd.Series, pd.Index)):
        return pd.to_datetime(x, errors="coerce").dt.tz_localize(None).dt.date
    if isinstance(x, datetime):
        return x.date()
    if isinstance(x, date):
        return x
    return pd.to_datetime(x).date()

# ---------- Payoff ----------
def compute_payoff_vec(spots: np.ndarray,
                       strikes: np.ndarray,
                       opt_types: np.ndarray) -> np.ndarray:
    is_call = (opt_types == "call")
    call_payoff = np.maximum(spots - strikes, 0)
    put_payoff  = np.maximum(strikes - spots, 0)
    return np.where(is_call, call_payoff, put_payoff)

# ---------- Ledger ----------
class TradeLedger:
    COLS = [
        "timestamp", "action", "instrument_type", "underlying",
        "option_type", "strike", "expiry", "quantity", "price",
        "signed_quantity", "total_cost"
    ]
    def __init__(self):
        self.trades_df = pd.DataFrame(columns=self.COLS)

    # --- normalization on write ---
    def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:
        out = df.copy()

        # required fields presence
        missing = [c for c in ["timestamp","action","instrument_type","underlying",
                               "option_type","strike","expiry","quantity","price"]
                   if c not in out.columns]
        if missing:
            raise ValueError(f"Missing required columns: {missing}")

        out["timestamp"] = to_utc(out["timestamp"])
        out["expiry"]    = to_date_only(out["expiry"])
        out["underlying"] = out["underlying"].astype("string")
        for c in ["action","instrument_type","option_type"]:
            out[c] = out[c].astype("category")

        # numeric
        out["strike"]   = pd.to_numeric(out["strike"], errors="coerce")
        out["quantity"] = pd.to_numeric(out["quantity"], errors="coerce")
        out["price"]    = pd.to_numeric(out["price"], errors="coerce")

        if "signed_quantity" not in out.columns or out["signed_quantity"].isna().any():
            out["signed_quantity"] = np.where(out["action"].astype(str)=="buy",
                                              out["quantity"], -out["quantity"])
        if "total_cost" not in out.columns or out["total_cost"].isna().any():
            out["total_cost"] = out["signed_quantity"] * out["price"]
        return out[self.COLS]

    def record_trades(self, new_trades: pd.DataFrame):
        self.trades_df = pd.concat([self.trades_df, self._normalize_df(new_trades)],
                                   ignore_index=True)

    def record_trade(self, *,
                     timestamp,
                     action: str,
                     instrument_type: str,
                     underlying: str,
                     option_type: str,
                     strike: float,
                     expiry,
                     quantity: float,
                     price: float):
        row = pd.DataFrame([{
            "timestamp": timestamp,
            "action": action,
            "instrument_type": instrument_type,
            "underlying": underlying,
            "option_type": option_type,
            "strike": strike,
            "expiry": expiry,
            "quantity": quantity,
            "price": price
        }])
        self.record_trades(row)

    @property
    def trades(self):
        return self.trades_df.sort_values("timestamp").reset_index(drop=True)

# ---------- Portfolio ----------
class PortfolioView:
    def __init__(self, ledger: TradeLedger):
        self.ledger = ledger

    def positions_at(self, as_of) -> pd.DataFrame:
        """as_of can be date or datetime; compare in UTC."""
        if isinstance(as_of, date) and not isinstance(as_of, datetime):
            as_of_utc = session_close_utc(as_of)
        else:
            as_of_utc = to_utc(as_of)

        df = self.ledger.trades_df
        if df.empty:
            return df

        df = df[df["timestamp"] <= as_of_utc]
        grouped = (
            df.groupby(["instrument_type","underlying","option_type","strike","expiry"],
                       dropna=False)["signed_quantity"].sum().reset_index()
        )
        return grouped[grouped["signed_quantity"] != 0]

# ---------- Vectorized expiry ----------
def expire_trades_vectorized(
    ledger: TradeLedger,
    as_of_date: date,
    price_lookup_fn: Callable[[np.ndarray, date], np.ndarray],
    close_time: time = time(16, 30)
):
    """Close all options with expiry <= as_of_date at NY close on as_of_date."""
    close_ts_utc = session_close_utc(as_of_date, close=close_time)
    opens = PortfolioView(ledger).positions_at(close_ts_utc)
    if opens.empty:
        return

    to_close = opens[opens["expiry"] <= as_of_date].copy()
    if to_close.empty:
        return

    # spots for each underlying on as_of_date close
    spots = price_lookup_fn(to_close["underlying"].to_numpy(), as_of_date)

    payoffs = compute_payoff_vec(
        spots,
        to_close["strike"].to_numpy(),
        to_close["option_type"].to_numpy()
    )

    actions = np.where(to_close["signed_quantity"].to_numpy() > 0, "sell", "buy")
    qtys    = np.abs(to_close["signed_quantity"].to_numpy())

    batch = pd.DataFrame({
        "timestamp":       close_ts_utc,
        "action":          actions,
        "instrument_type": "option",
        "underlying":      to_close["underlying"].to_numpy(),
        "option_type":     to_close["option_type"].to_numpy(),
        "strike":          to_close["strike"].to_numpy(),
        "expiry":          to_close["expiry"].to_numpy(),  # keep as date
        "quantity":        qtys,
        "price":           payoffs,
    })
    ledger.record_trades(batch)

In [None]:
dt1=pd.Timestamp("2023-12-01 9:30:00", tz="America/New_York")

session_close_utc(dt1)
to_utc(dt1)




Timestamp('2023-12-01 14:30:00+0000', tz='UTC')