# Candidate Starter Notebook

Follow the prompts. Keep answers concise; vectorize where possible.

In [85]:
# Imports
import pandas as pd
import numpy as np

from answers import (
    br_norm_answer,
    fa_norm_answer,
    break_pos_answer,
    br_trd_norm_answer,
    fa_trd_norm_answer,
    fa_p_answer,
    br_p_answer,
    pnl_cmp_answer,
    pnl_cmp_v2_answer
)

# Paths
DATA = "../data"
POS_DIR = f"{DATA}/positions"
TRD_DIR = f"{DATA}/trades"

pd.set_option("display.float_format", lambda v: f"{v:,.4f}")

## Step 1 — Positions Recon (as of 2025‑09‑11)

**Prompt:**
1) Load Fund Admin and Broker positions.

2) Normalize to common schema: `'date', 'security_description', 'security_id', 'quantity', 'price','multiplier', 'avg_cost', 'start_of_month_price'`.

3) Merge on `security_id` (prefer `validate='one_to_one'`).

4) Produce break table: `unique_id, qty_admin, qty_broker, qty_diff, avg_cost_admin, avg_cost_broker, avg_cost_diff`. 

In [86]:
# Load positions
fa_raw = pd.read_csv(f"{POS_DIR}/fund_admin_positions_2025-09-11.csv", parse_dates=["date"])
br_raw = pd.read_csv(f"{POS_DIR}/broker_positions_2025-09-11.csv", parse_dates=["date"])

#### Get a feel for the data

In [87]:
# View the data and describe the data verbally

In [4]:
# Display fund admin data
print('Fund admin data:')
display(fa_raw)

Fund admin data:


Unnamed: 0,lot_id,tag,date,security_description,unique_id,lot_qty,price,contract_multiplier,average_cost,start_of_month_price,lot_sign
0,a,Hedge,2025-09-11,GOLD CMX,GC_Z5,2,1943.2,100,1935.2,1920.0,1
1,b,Long Equity,2025-09-11,EMINI,ES_Z5,1,5578.25,50,5560.0,5480.0,1
2,c,Trend,2025-09-11,LEAN HOGS,HE_Z5,1,76.45,400,77.1,78.25,-1
3,d,Hedge,2025-09-11,GOLD CMX,GC_Z5,1,1943.2,100,1934.9,1920.0,1


In [5]:
# Display broker data
print('Broker data:')
display(br_raw)

Broker data:


Unnamed: 0,date,ticker,unique_id,quantity,price,contract_multiplier,average_cost,start_of_month_price
0,2025-09-11,GCZ5,GC_Z5,3,1943.2,100,1935.1,1920.0
1,2025-09-11,ESZ5,ES_Z5,1,5578.25,50,5560.0,5480.0
2,2025-09-11,HEZ5,HE_Z5,-1,76.45,400,77.1,78.25


#### Normalize

In [6]:
# Converters to update broker/fund admin column names to LTA schema names
pos_col_converters = {
'date': 'date',
'ticker': 'security_description',
'unique_id': 'security_id',
'quantity': 'quantity',
'price': 'price',
'contract_multiplier': 'multiplier',
'average_cost': 'avg_cost'
}

# LTA only keeps these columns in our position schemas
keep_pos_columns = [
'date',
'security_description',
'security_id',
'quantity',
'price',
'multiplier',
'avg_cost',
'start_of_month_price',
]

In [7]:
# I have normalized the fund admin data for you
# Can you walk me through what the code is doing line by line?

In [8]:
def normalize_fa_pos(fa_pos_raw, lot_col, avg_cost_col):
    # 1. Copy raw data
    d = fa_pos_raw.copy()
    # 2. Sign the quantity
    d['lot_signed'] = d[lot_col] * d['lot_sign']
    # 3. Compute average cost for lots and total quantity for each ticker
    # Get quantity * price
    d['qty_times_price'] = d[avg_cost_col] * d['lot_signed'] 
    # Groupby and aggregate
    pos_group_cols = [
        'date',
        'security_description',
        'price',
        'contract_multiplier',
        'start_of_month_price',
        'unique_id',
        ]
    
    agg = d.groupby(pos_group_cols).agg(
        quantity=('lot_signed', 'sum'), total_qty_times_price=('qty_times_price', 'sum')
        )
    # Average cost equals quantity * price / total quantity
    agg['avg_cost'] = agg['total_qty_times_price'] / agg['quantity']
    # reset index
    agg = agg.reset_index()
    # name columns
    agg = agg.rename(columns=pos_col_converters)
    # only keep LTA schema columns
    agg = agg[keep_pos_columns]
    # return dataframe
    return agg

In [9]:
display(fa_raw)
fa_norm = normalize_fa_pos(fa_raw, 'lot_qty', 'average_cost').sort_values(by='security_id')
display(fa_norm)

Unnamed: 0,lot_id,tag,date,security_description,unique_id,lot_qty,price,contract_multiplier,average_cost,start_of_month_price,lot_sign
0,a,Hedge,2025-09-11,GOLD CMX,GC_Z5,2,1943.2,100,1935.2,1920.0,1
1,b,Long Equity,2025-09-11,EMINI,ES_Z5,1,5578.25,50,5560.0,5480.0,1
2,c,Trend,2025-09-11,LEAN HOGS,HE_Z5,1,76.45,400,77.1,78.25,-1
3,d,Hedge,2025-09-11,GOLD CMX,GC_Z5,1,1943.2,100,1934.9,1920.0,1


Unnamed: 0,date,security_description,security_id,quantity,price,multiplier,avg_cost,start_of_month_price
0,2025-09-11,EMINI,ES_Z5,1,5578.25,50,5560.0,5480.0
1,2025-09-11,GOLD CMX,GC_Z5,3,1943.2,100,1935.1,1920.0
2,2025-09-11,LEAN HOGS,HE_Z5,-1,76.45,400,77.1,78.25


In [10]:
# TODO: normalize broker data to LTA Schema Requirements, sort values by security_id
# Name the normalized dataframe br_norm
# Note that this is FAR easier than the normalization above -- you do not need a function

In [11]:
br_norm = br_raw.rename(columns=pos_col_converters).copy()[keep_pos_columns].sort_values(by='security_id')
display(br_norm)

Unnamed: 0,date,security_description,security_id,quantity,price,multiplier,avg_cost,start_of_month_price
1,2025-09-11,ESZ5,ES_Z5,1,5578.25,50,5560.0,5480.0
0,2025-09-11,GCZ5,GC_Z5,3,1943.2,100,1935.1,1920.0
2,2025-09-11,HEZ5,HE_Z5,-1,76.45,400,77.1,78.25


In [12]:
if br_norm.equals(br_norm_answer):
    print('Congratulations')
else:
    print('Your answer: ')
    display(br_norm)
    print('My answer: ')
    display(br_norm_answer)
br_norm = br_norm_answer.copy()

Congratulations


#### Merge

In [13]:
# Now that we have normalized broker and fund admin data, 
# we will merge the 2 dataframes

In [14]:
# One-to-one join on unique_id
pos = (fa_norm.add_suffix("_fa")
       .merge(br_norm.add_suffix("_br"),
              left_on="security_id_fa", right_on="security_id_br",
              how="outer", validate="one_to_one"))

display(pos)

Unnamed: 0,date_fa,security_description_fa,security_id_fa,quantity_fa,price_fa,multiplier_fa,avg_cost_fa,start_of_month_price_fa,date_br,security_description_br,security_id_br,quantity_br,price_br,multiplier_br,avg_cost_br,start_of_month_price_br
0,2025-09-11,EMINI,ES_Z5,1,5578.25,50,5560.0,5480.0,2025-09-11,ESZ5,ES_Z5,1,5578.25,50,5560.0,5480.0
1,2025-09-11,GOLD CMX,GC_Z5,3,1943.2,100,1935.1,1920.0,2025-09-11,GCZ5,GC_Z5,3,1943.2,100,1935.1,1920.0
2,2025-09-11,LEAN HOGS,HE_Z5,-1,76.45,400,77.1,78.25,2025-09-11,HEZ5,HE_Z5,-1,76.45,400,77.1,78.25


#### Compute breaks and make a breaks report

In [15]:
# TODO: Build break table
# 1. Keep only 1 security_id column
pos['security_id'] = ''
for _, r in pos.iterrows():
    assert r.security_id_fa == r.security_id_br
    r['security_id'] = r.security_id_fa
pos = pos.drop(columns=['security_id_fa', 'security_id_br'])

# Compute quantity breaks
pos['qty_diff'] = pos["quantity_br"].fillna(0) - pos["quantity_fa"].fillna(0)

# Compute average cost difference
pos["avg_cost_diff"] = pos["avg_cost_br"] - pos["avg_cost_fa"]

# Output a clean dataframe, sort values by security_id
break_pos = pos[[
    "security_id",
    "security_description_fa","security_description_br",
    "quantity_fa","quantity_br","qty_diff",
    "avg_cost_fa","avg_cost_br","avg_cost_diff"
]].sort_values("security_id").reset_index(drop=True)

display(break_pos)

Unnamed: 0,security_id,security_description_fa,security_description_br,quantity_fa,quantity_br,qty_diff,avg_cost_fa,avg_cost_br,avg_cost_diff
0,,EMINI,ESZ5,1,1,0,5560.0,5560.0,0.0
1,,GOLD CMX,GCZ5,3,3,0,1935.1,1935.1,-0.0
2,,LEAN HOGS,HEZ5,-1,-1,0,77.1,77.1,0.0


In [16]:
if break_pos.equals(break_pos_answer):
    print('Congratulations')
else:
    print('Your answer: ')
    display(break_pos)
    print('My answer: ')
    display(break_pos_answer)
break_pos = break_pos_answer.copy()

Congratulations


## Step 2 — Trades + EOD P&L (Trade Date 2025‑09‑12)

**Prompt:**
1) Load **both** trade files; compute `notional`.
2) Merge marks; compute per-trade EOD `pnl`.
3) Aggregate per `unique_id` and compare Admin vs Broker (identify ES break).
4) (Optional) Add carry P&L from 09/11 to 09/12 and show total P&L.

In [17]:
# Load trades
fa_trd = pd.read_csv(f"{TRD_DIR}/fund_admin_trades_2025-09-12.csv", parse_dates=["trade_date","settle_date"])
br_trd = pd.read_csv(f"{TRD_DIR}/broker_trades_2025-09-12.csv", parse_dates=["trade_date","settle_date"])

In [18]:
# View the data and describe the data verbally

In [19]:
# Display fund admin data
print('Fund admin data:')
display(fa_trd)

Fund admin data:


Unnamed: 0,trade_id,ticker,unique_id,trade_date,settle_date,price,commissions,quantity,contract_multiplier
0,a,GOLD CMX,GC_Z5,2025-09-12,2025-09-15,1949.0,6.25,1,100
1,b,EMINI,ES_Z5,2025-09-12,2025-09-15,5589.5,15.0,1,50
2,c,LEAN HOGS,HE_Z5,2025-09-12,2025-09-15,76.75,8.0,-1,400
3,d,GOLD CMX,GC_Z5,2025-09-12,2025-09-15,1949.6,6.25,1,100


In [20]:
# Display broker data
print('Broker data:')
display(br_trd)

Broker data:


Unnamed: 0,ticker,unique_id,trade_date,settle_date,price,commissions,quantity,contract_multiplier
0,GCZ5,GC_Z5,2025-09-12,2025-09-15,1949.3,12.5,2,100
1,ESZ5,ES_Z5,2025-09-12,2025-09-15,5588.25,15.0,1,50
2,HEZ5,HE_Z5,2025-09-12,2025-09-15,76.75,8.0,-1,400


In [21]:
# This time we have not normalized the data for you, 
# TODO: Normalize trades data
# Feel free to refer above to copy code
# The schema requirements are below

# Converters to update broker/fund admin column names to LTA schema names
trd_col_converters = {
    'date': 'date',
    'ticker': 'security_description',
    'unique_id': 'security_id',
    'quantity': 'quantity',
    'price': 'price',
    'contract_multiplier': 'multiplier',
}

# Final columns that should be in your trades dataframe
keep_trd_cols = [
    'security_description',
    'security_id',
    'trade_date',
    'settle_date',
    'price',
    'commissions',
    'quantity',
    'multiplier']

In [22]:
# TODO: normalize broker trade data to LTA Schema Requirements, sort values by security_id
# Name the normalized dataframe br_trd_norm
br_trd_norm = br_trd.rename(columns=trd_col_converters).copy()[keep_trd_cols].sort_values(by='security_id')
display(br_trd_norm)

Unnamed: 0,security_description,security_id,trade_date,settle_date,price,commissions,quantity,multiplier
1,ESZ5,ES_Z5,2025-09-12,2025-09-15,5588.25,15.0,1,50
0,GCZ5,GC_Z5,2025-09-12,2025-09-15,1949.3,12.5,2,100
2,HEZ5,HE_Z5,2025-09-12,2025-09-15,76.75,8.0,-1,400


In [23]:
if br_trd_norm.equals(br_trd_norm_answer):
    print('Congratulations')
else:
    print('Your answer: ')
    display(br_trd_norm)
    print('My answer: ')
    display(br_trd_norm_answer)
br_trd_norm = br_trd_norm_answer.copy()

Congratulations


In [24]:
# TODO: normalize fund admin trade data to LTA Schema Requirements, sort values by security_id

In [25]:
# Hints available:
# 1. here are the columns you need to group by and aggregate over
d = fa_trd.copy()
d = d.rename(columns={'unique_id':'security_id'})
trd_grp_cols =     ['ticker', 
                    'security_id',
                    'trade_date',
                    'settle_date',
                    'contract_multiplier']


# Groupby and aggregate
# 1. Total quantity
# 2. Total commissions
# 3. Price (how will you agg this?)
d['qty_times_price'] = d['quantity'] * d['price']

agg = d.groupby(trd_grp_cols).agg(
        quantity=('quantity', 'sum'), 
        total_qty_times_price=('qty_times_price', 'sum'),
        commissions=('commissions', 'sum')
        )

# Average price equals quantity * price / total quantity
agg['price'] = agg['total_qty_times_price'] / agg['quantity']

# Reset Index, rename columns, keep LTA schema columns, sort_values
agg = agg.reset_index()
fa_trd_norm = agg.rename(columns=trd_col_converters).copy()[keep_trd_cols].sort_values(by='security_id')

In [26]:
if fa_trd_norm.equals(fa_trd_norm_answer):
    print('Congratulations')
else:
    print('Your answer: ')
    display(fa_trd_norm)
    print('My answer: ')
    display(fa_trd_norm_answer)
fa_trd_norm = fa_trd_norm_answer.copy()

Congratulations


In [27]:
# Now we have our daily trades
# We want to calculate our daily PNL to see if broker and fund admin are matching
# Here are the end of day marks for 09/12
MARKS = f"{DATA}/marks_2025-09-12.csv"
marks  = pd.read_csv(MARKS, parse_dates=["date"])
display(marks)

Unnamed: 0,date,security_id,close
0,2025-09-12,GC_Z5,1951.0
1,2025-09-12,ES_Z5,5592.0
2,2025-09-12,HE_Z5,76.3


In [28]:
# Function to merge closing marks with normalized trades
def merge_close_marks_on_trades(df, marks):
    d = df.copy()
    out = df.merge(marks[["security_id","close"]], on="security_id", how="left", validate="many_to_one")
    return out

fa_trd_cl = merge_close_marks_on_trades(fa_trd_norm, marks)

br_trd_cl = merge_close_marks_on_trades(br_trd_norm, marks)

In [29]:
print('Fund admin trades: ')
display(fa_trd_cl)
print('Broker trades: ')
display(br_trd_cl)

Fund admin trades: 


Unnamed: 0,security_description,security_id,trade_date,settle_date,price,commissions,quantity,multiplier,close
0,EMINI,ES_Z5,2025-09-12,2025-09-15,5589.5,15.0,1,50,5592.0
1,GOLD CMX,GC_Z5,2025-09-12,2025-09-15,1949.3,12.5,2,100,1951.0
2,LEAN HOGS,HE_Z5,2025-09-12,2025-09-15,76.75,8.0,-1,400,76.3


Broker trades: 


Unnamed: 0,security_description,security_id,trade_date,settle_date,price,commissions,quantity,multiplier,close
0,ESZ5,ES_Z5,2025-09-12,2025-09-15,5588.25,15.0,1,50,5592.0
1,GCZ5,GC_Z5,2025-09-12,2025-09-15,1949.3,12.5,2,100,1951.0
2,HEZ5,HE_Z5,2025-09-12,2025-09-15,76.75,8.0,-1,400,76.3


In [30]:
# TODO: Write a function that takes in a trades dataframe and calculates trade PNL
# You can ignore comms -- but tell us how you would account for it
# The returned df should have one additional column called pnl

In [31]:
def calculate_notional_and_pnl(df):
    df = df.copy()
    df["notional"] = df["price"] * df["quantity"] * df["multiplier"]
    df["pnl"] = (df["close"] - df["price"]) * df["quantity"] * df["multiplier"] 
    return df

In [32]:
fa_p = calculate_notional_and_pnl(fa_trd_cl)
assert type(fa_p) == pd.core.frame.DataFrame

In [33]:
if fa_p.equals(fa_p_answer):
    print('Congratulations')
else:
    print('Your answer: ')
    display(fa_p)
    print('My answer: ')
    display(fa_p_answer)
fa_p = fa_p_answer.copy()

Congratulations


In [34]:
br_p = calculate_notional_and_pnl(br_trd_cl)
assert type(fa_p) == pd.core.frame.DataFrame

In [35]:
if br_p.equals(br_p_answer):
    print('Congratulations')
else:
    print('Your answer: ')
    display(br_p)
    print('My answer: ')
    display(br_p_answer)
br_p = br_p_answer.copy()

Congratulations


In [36]:
# Aggregate per instrument
fa_p = fa_p.groupby("security_id", as_index=False)["pnl"].sum().rename(columns={"pnl":"pnl_admin"})
br_p = br_p.groupby("security_id", as_index=False)["pnl"].sum().rename(columns={"pnl":"pnl_broker"})

pnl_cmp = (fa_p.merge(br_p, on="security_id", how="outer").fillna(0.0))
pnl_cmp["pnl_break"] = pnl_cmp["pnl_broker"] - pnl_cmp["pnl_admin"]
pnl_cmp = pnl_cmp.sort_values("security_id").reset_index(drop=True)

In [37]:
pnl_cmp

Unnamed: 0,security_id,pnl_admin,pnl_broker,pnl_break
0,ES_Z5,125.0,187.5,62.5
1,GC_Z5,340.0,340.0,0.0
2,HE_Z5,180.0,180.0,0.0


## Step 3 — After Broker Correction (ES price fixed)

Reload broker v2 trades and confirm the **P&L break clears**.

In [38]:
br_trd_v2 = pd.read_csv(f"{TRD_DIR}/broker_trades_2025-09-12_v2.csv", parse_dates=["trade_date","settle_date"])
print("New data: ")
display(br_trd_v2)
print("Old data: ")
display(br_trd)

New data: 


Unnamed: 0,ticker,unique_id,trade_date,settle_date,price,commissions,quantity,contract_multiplier
0,GCZ5,GC_Z5,2025-09-12,2025-09-15,1949.3,12.5,2,100
1,ESZ5,ES_Z5,2025-09-12,2025-09-15,5589.5,15.0,1,50
2,HEZ5,HE_Z5,2025-09-12,2025-09-15,76.75,8.0,-1,400


Old data: 


Unnamed: 0,ticker,unique_id,trade_date,settle_date,price,commissions,quantity,contract_multiplier
0,GCZ5,GC_Z5,2025-09-12,2025-09-15,1949.3,12.5,2,100
1,ESZ5,ES_Z5,2025-09-12,2025-09-15,5588.25,15.0,1,50
2,HEZ5,HE_Z5,2025-09-12,2025-09-15,76.75,8.0,-1,400


In [39]:
# TODO: recompute the pnl_cmp table with the updated broker trade file
# name the new table pnl_cmp_v2
# make sure that all variables where you are using the new data end with _v2
# The final table should be named pnl_cmp_v2

In [40]:
br_trd_norm_v2 = br_trd_v2.rename(columns=trd_col_converters).copy()[keep_trd_cols].sort_values(by='security_id')
display(br_trd_norm_v2)

Unnamed: 0,security_description,security_id,trade_date,settle_date,price,commissions,quantity,multiplier
1,ESZ5,ES_Z5,2025-09-12,2025-09-15,5589.5,15.0,1,50
0,GCZ5,GC_Z5,2025-09-12,2025-09-15,1949.3,12.5,2,100
2,HEZ5,HE_Z5,2025-09-12,2025-09-15,76.75,8.0,-1,400


In [41]:
br_trd_cl_v2 = merge_close_marks_on_trades(br_trd_norm_v2, marks)
display(br_trd_cl_v2)

Unnamed: 0,security_description,security_id,trade_date,settle_date,price,commissions,quantity,multiplier,close
0,ESZ5,ES_Z5,2025-09-12,2025-09-15,5589.5,15.0,1,50,5592.0
1,GCZ5,GC_Z5,2025-09-12,2025-09-15,1949.3,12.5,2,100,1951.0
2,HEZ5,HE_Z5,2025-09-12,2025-09-15,76.75,8.0,-1,400,76.3


In [42]:
br_p_v2 = calculate_notional_and_pnl(br_trd_cl_v2)
display(br_p_v2)

Unnamed: 0,security_description,security_id,trade_date,settle_date,price,commissions,quantity,multiplier,close,notional,pnl
0,ESZ5,ES_Z5,2025-09-12,2025-09-15,5589.5,15.0,1,50,5592.0,279475.0,125.0
1,GCZ5,GC_Z5,2025-09-12,2025-09-15,1949.3,12.5,2,100,1951.0,389860.0,340.0
2,HEZ5,HE_Z5,2025-09-12,2025-09-15,76.75,8.0,-1,400,76.3,-30700.0,180.0


In [43]:
br_p_v2 = br_p_v2.groupby("security_id", as_index=False)["pnl"].sum().rename(columns={"pnl":"pnl_broker"})

pnl_cmp_v2 = (fa_p.merge(br_p_v2, on="security_id", how="outer").fillna(0.0))
pnl_cmp_v2["pnl_break"] = pnl_cmp_v2["pnl_broker"] - pnl_cmp["pnl_admin"]
pnl_cmp_v2 = pnl_cmp_v2.sort_values("security_id").reset_index(drop=True)

In [44]:
pnl_cmp_v2

Unnamed: 0,security_id,pnl_admin,pnl_broker,pnl_break
0,ES_Z5,125.0,125.0,0.0
1,GC_Z5,340.0,340.0,0.0
2,HE_Z5,180.0,180.0,0.0


In [45]:
if pnl_cmp_v2.equals(pnl_cmp_v2_answer):
    print('Congratulations')
else:
    print('Your answer: ')
    display(pnl_cmp_v2)
    print('My answer: ')
    display(pnl_cmp_v2_answer)
pnl_cmp_v2 = pnl_cmp_v2_answer.copy()

Congratulations


## Step 4 — Class Development

Create a class that can be used to do a simple reconcilation

In [83]:
from abc import ABC, abstractmethod

# =============== 1) Abstraction ===============

class RecordSource(ABC):
    """
    Abstract contract for any tabular source. 
    Subclasses must return a DataFrame with a COMMON schema via normalize().
    Target schema: ['id', 'qty'] where qty is signed.
    """

    @abstractmethod
    def get_data(self) -> pd.DataFrame:
        """Return raw DataFrame (whatever columns the source has)."""
        pass

    @abstractmethod
    def normalize(self, df: pd.DataFrame) -> pd.DataFrame:
        """Map raw to common schema ['id','qty'] (signed)."""
        pass

# =============== 2) Inheritance ===============

class AdminSource(RecordSource):
    """
    Raw schema: ['record_id','quantity','sign'] 
      - quantity is unsigned, direction in 'sign' (+1 or -1).
    """

    def __init__(self, raw_df: pd.DataFrame):
        self.raw = raw_df

    def get_data(self) -> pd.DataFrame:
        return self.raw.copy()

    def normalize(self, df: pd.DataFrame) -> pd.DataFrame:
        out = df.rename(columns={'record_id':'id', 'quantity':'quantity_abs'})
        out['qty'] = out['quantity_abs'] * out['sign']
        return out[['id','qty']]


class BrokerSource(RecordSource):
    """
    Raw schema: ['id','qty'] 
      - qty already signed
    """

    def __init__(self, raw_df: pd.DataFrame):
        self.raw = raw_df

    def get_data(self) -> pd.DataFrame:
        return self.raw.copy()

    def normalize(self, df: pd.DataFrame) -> pd.DataFrame:
        return df[['id','qty']].copy()

# =============== 3) Composition ===============

class SimpleReconciler:
    def __init__(self, left: RecordSource, right: RecordSource):
        self.left = left
        self.right = right

    def breaks(self) -> pd.DataFrame:
        l_norm = self.left.normalize(self.left.get_data()).add_suffix('_L')
        r_norm = self.right.normalize(self.right.get_data()).add_suffix('_R')

        merged = l_norm.merge(
            r_norm,
            left_on='id_L',
            right_on='id_R',
            how='outer',
            validate='one_to_one'
        )

        merged = merged.rename(columns={'id_L':'id'}).drop(columns=['id_R'])
        merged['qty_diff'] = merged['qty_R'].fillna(0) - merged['qty_L'].fillna(0)
        return merged[['id','qty_L','qty_R','qty_diff']].sort_values('id')

In [84]:
admin_raw = pd.DataFrame({
    'record_id': ['A','B','C'],
    'quantity':  [  3,   2,   1],
    'sign':      [ +1,  +1,  -1]
})

broker_raw = pd.DataFrame({
    'id':  ['A','B','C'],
    'qty': [  3,   1,  -1]
})

admin  = AdminSource(admin_raw)
broker = BrokerSource(broker_raw)

recon = SimpleReconciler(admin, broker)
print(recon.breaks())

# Quick check of polymorphism
for src in [admin, broker]:
    print(f"\n{src.__class__.__name__} normalized:")
    print(src.normalize(src.get_data()))

  id  qty_L  qty_R  qty_diff
0  A      3      3         0
1  B      2      1        -1
2  C     -1     -1         0

AdminSource normalized:
  id  qty
0  A    3
1  B    2
2  C   -1

BrokerSource normalized:
  id  qty
0  A    3
1  B    1
2  C   -1
