In [1]:
# Value Creation Bridge aligned to Achleitner et al. 2010; Achleitner, Braun & Puche 2015; Söffge & Braun 2017)

# === Core Value-Bridge Items: times_money and tm_unlevered (exit-row div/cap injection) ===

from pathlib import Path
import pandas as pd
import numpy as np

# ---- helper ----
def find_upwards(rel_path: Path, max_up: int = 8) -> Path:
    here = Path.cwd()
    for parent in [here, *here.parents][: max_up + 1]:
        candidate = (parent / rel_path)
        if candidate.exists():
            return candidate.resolve()
    raise FileNotFoundError(
        f"Couldn't locate '{rel_path.as_posix()}' from {here} by walking up {max_up} levels.\n"
        f"- Current working directory: {here}\n"
        f"- Checked: {[str((p / rel_path)) for p in [here, *here.parents][: max_up + 1]]}"
    )

TARGET_CSV = (find_upwards(Path("ValueCreation")) / "Data" / "working.csv")

# ---- load + checks ----
df = pd.read_csv(TARGET_CSV, dtype={"id": str, "deal_id": str})
assert df.groupby("deal_id")["id"].nunique().eq(2).all(), "Each deal_id must have exactly 2 rows."

def num(s): return pd.to_numeric(s, errors="coerce")

# Order within deal: 1 = entry (earliest), 2 = exit (latest)
df["_ref_dt"] = pd.to_datetime(df["reference_date"], errors="coerce")
df["_rank"]   = df.groupby("deal_id")["_ref_dt"].rank(method="first", ascending=True)

eq  = num(df["equity"])

# Entry / Exit equity
equity_entry = (df.loc[df["_rank"] == 1, ["deal_id"]]
                  .assign(equity_entry=eq[df["_rank"] == 1].values)
                  .groupby("deal_id")["equity_entry"].first())
equity_exit  = (df.loc[df["_rank"] == 2, ["deal_id"]]
                  .assign(equity_exit=eq[df["_rank"] == 2].values)
                  .groupby("deal_id")["equity_exit"].first())

# ---- deal-level inputs (exit-row dividends & capital injections) ----
div_exit = (df.loc[df["_rank"] == 2, ["deal_id"]]
              .assign(div_exit=num(df["dividends"])[df["_rank"] == 2].values)
              .groupby("deal_id")["div_exit"].first())

cap_exit = (df.loc[df["_rank"] == 2, ["deal_id"]]
              .assign(cap_exit=num(df["capital_injections"])[df["_rank"] == 2].values)
              .groupby("deal_id")["cap_exit"].first())

div_exit = div_exit.fillna(0.0)
cap_exit = cap_exit.fillna(0.0)

# ---- dtype & presence guards (drop-in right after div_exit/cap_exit fillna) ----
# Ensure numeric dtype and no missing equity at entry/exit (these are the two rows you rely on)
assert equity_entry.notna().all(), "Equity missing on entry rows."
assert equity_exit.notna().all(),  "Equity missing on exit rows."

# Make sure dividends / cap injections are numeric after fill
div_exit  = pd.to_numeric(div_exit, errors="coerce").fillna(0.0)
cap_exit  = pd.to_numeric(cap_exit, errors="coerce").fillna(0.0)


# ---- average D/E as entry/exit two-point mean; clip at 0 for unlevering ----
der_series = num(df["de_ratio"])
der_entry  = df.loc[df["_rank"] == 1, ["deal_id"]].assign(der=der_series[df["_rank"] == 1].values) \
                 .groupby("deal_id")["der"].first()
der_exit   = df.loc[df["_rank"] == 2, ["deal_id"]].assign(der=der_series[df["_rank"] == 2].values) \
                 .groupby("deal_id")["der"].first()
der_avg    = pd.concat([der_entry, der_exit], axis=1).mean(axis=1).clip(lower=0.0)

# ---- cost of debt: take the (deal-level) compounded total, not a mean ----
cod_spread = df.groupby("deal_id")["cost_of_debt"].agg(lambda s: float(s.max() - s.min()))
assert (cod_spread.abs() < 1e-12).all(), "cost_of_debt differs across rows; confirm intent."

cod_avg = df.groupby("deal_id")["cost_of_debt"].first()

# ---- formulas (deal level) ----
d_equity = equity_exit - equity_entry
net_capital_gain = d_equity + div_exit + cap_exit
invested_capital = equity_entry - cap_exit

# ---- invested capital guard: require strictly positive IC ----
valid_ic = invested_capital > 0
if not valid_ic.all():
    bad = (~valid_ic).sum()
    print(f"Dropping {bad} deal(s) with invested_capital <= 0.")
    # Hard drop of invalid deals from all deal-level vectors:
    equity_entry      = equity_entry[valid_ic]
    equity_exit       = equity_exit[valid_ic]
    div_exit          = div_exit[valid_ic]
    cap_exit          = cap_exit[valid_ic]
    d_equity          = d_equity[valid_ic]
    net_capital_gain  = net_capital_gain[valid_ic]
    invested_capital  = invested_capital[valid_ic]
    der_avg           = der_avg[valid_ic]
    cod_avg           = cod_avg[valid_ic]

# Remove invalid deals from the row-level frame as well
survivor_deals = set(invested_capital.index)  # index currently is deal_id for valid deals
df = df[df["deal_id"].isin(survivor_deals)].copy()


with np.errstate(divide="ignore", invalid="ignore"):
    times_money = (net_capital_gain / invested_capital).where(invested_capital != 0)

with np.errstate(divide="ignore", invalid="ignore"):
    tm_unlevered = (times_money + cod_avg * der_avg) / (1 + der_avg)

leverage_effect = times_money - tm_unlevered

# ---- broadcast to both rows ----
deal_metrics = pd.DataFrame({
    "deal_id": d_equity.index,
    "d_equity": d_equity.values,
    "net_capital_gain": net_capital_gain.values,
    "invested_capital": invested_capital.values,
    "times_money": times_money.values,
    "tm_unlevered": tm_unlevered.values,
    "leverage_effect": leverage_effect.values,
}).set_index("deal_id")

out = df.drop(columns=["_ref_dt","_rank"]).merge(
    deal_metrics, left_on="deal_id", right_index=True, how="left"
)

# ---- persist + checks ----
out.to_csv(TARGET_CSV, index=False)

chk = pd.read_csv(TARGET_CSV, dtype={"deal_id": str})
# exactly 2 rows per surviving deal
assert chk.groupby("deal_id")["id"].nunique().eq(2).all(), "Row cardinality changed."
# no NaNs in core outputs
req = ["d_equity","net_capital_gain","invested_capital","times_money","tm_unlevered","leverage_effect"]
assert set(req).issubset(chk.columns), "Missing expected columns."
assert chk[req].notna().all().all(), "NaNs in deal-level outputs after merge."

print("Deal-level metrics added (exit-row div/cap applied):",
      ["d_equity","net_capital_gain","invested_capital","times_money","tm_unlevered","leverage_effect"])

# === Optional post-filter: drop short-hold deals with |TM| ~ 0 ===
APPLY_TM_SHORT_HOLD_FILTER = True   # toggle
TM_ABS_THRESHOLD = 0.10             # remove if |TM| < this AND holding period ≤ threshold
HOLD_DAYS_THRESHOLD = 365

if APPLY_TM_SHORT_HOLD_FILTER:
    # Reload from disk to keep this independent of the calculation above
    chk = pd.read_csv(TARGET_CSV, dtype={"id": str, "deal_id": str})

    # Parse dates and compute per-deal holding period in days
    ref_dt = pd.to_datetime(chk["reference_date"], errors="coerce")
    g = chk.assign(_ref_dt=ref_dt).groupby("deal_id", as_index=True)

    # Per-deal metrics (times_money should be identical across the two rows)
    tm_per_deal = pd.to_numeric(g["times_money"].first(), errors="coerce")
    hold_days   = (g["_ref_dt"].max() - g["_ref_dt"].min()).dt.days

    # Identify deals to drop: |TM| < threshold AND holding period ≤ threshold
    tiny_tm = tm_per_deal.abs() < TM_ABS_THRESHOLD
    short_hold = hold_days <= HOLD_DAYS_THRESHOLD
    to_drop_mask = tiny_tm & short_hold
    drop_ids = set(tm_per_deal.index[to_drop_mask])

    if drop_ids:
        before = int(chk["deal_id"].nunique())
        cleaned = chk[~chk["deal_id"].isin(drop_ids)].copy()
        cleaned.to_csv(TARGET_CSV, index=False)

        # Post-write integrity checks
        chk2 = pd.read_csv(TARGET_CSV, dtype={"deal_id": str})
        assert chk2.groupby("deal_id")["id"].nunique().eq(2).all(), "Row cardinality changed improperly."
        req = ["d_equity","net_capital_gain","invested_capital","times_money","tm_unlevered","leverage_effect"]
        assert set(req).issubset(chk2.columns), "Missing expected columns after filter."
        assert chk2[req].notna().all().all(), "NaNs in deal-level outputs after filter."

        after = int(chk2["deal_id"].nunique())
        print(f"TM short-hold filter applied: removed {before - after} deal(s) "
              f"(abs(TM) < {TM_ABS_THRESHOLD} and hold ≤ {HOLD_DAYS_THRESHOLD} days).")
    else:
        print("TM short-hold filter applied: no deals met removal criteria.")
else:
    print("TM short-hold filter skipped (toggle off).")

g = pd.read_csv(TARGET_CSV, dtype={"deal_id": str})
by_status = (g.drop_duplicates(["deal_id","holding_status"])
               .groupby("holding_status")["deal_id"].nunique())
print(
    "unique_deals_exited:",   int(by_status.get("exited", 0)),
    "unique_deals_unexited:", int(by_status.get("unexited", 0)),
    "unique_deals_total:",    g["deal_id"].nunique()
)


Deal-level metrics added (exit-row div/cap applied): ['d_equity', 'net_capital_gain', 'invested_capital', 'times_money', 'tm_unlevered', 'leverage_effect']
TM short-hold filter applied: removed 9 deal(s) (abs(TM) < 0.1 and hold ≤ 365 days).
unique_deals_exited: 210 unique_deals_unexited: 135 unique_deals_total: 345


In [2]:
# === Absolute Value Drivers: Multiple, FCF, EBITDA (PBA-aligned) ===

from pathlib import Path
import pandas as pd
import numpy as np

# -- helper --
def find_upwards(rel_path: Path, max_up: int = 8) -> Path:
    here = Path.cwd()
    for parent in [here, *here.parents][: max_up + 1]:
        candidate = (parent / rel_path)
        if candidate.exists():
            return candidate.resolve()
    raise FileNotFoundError(
        f"Couldn't locate '{rel_path.as_posix()}' from {here} by walking up {max_up} levels.\n"
        f"- CWD: {here}\n"
        f"- Checked: {[str((p / rel_path)) for p in [here, *here.parents][: max_up + 1]]}"
    )

TARGET_CSV = (find_upwards(Path("ValueCreation")) / "Data" / "working.csv")

# -- load + guarantees --
df = pd.read_csv(TARGET_CSV, dtype={"id": str, "deal_id": str})
assert df.groupby("deal_id")["id"].nunique().eq(2).all(), "Each deal_id must have exactly 2 rows."

# -- order within deal: 1 = entry (earliest), 2 = exit/latest --
df["_ref_dt"] = pd.to_datetime(df["reference_date"], errors="coerce")
df["_rank"] = df.groupby("deal_id")["_ref_dt"].rank(method="first", ascending=True)

def num(s): return pd.to_numeric(s, errors="coerce")

xe  = num(df["xebitda"])
eb  = num(df["ebitda"])
rv  = num(df["revenue"])
mg  = num(df["ebitda_margin"])
nd  = num(df["net_debt"])
div = num(df["dividends"])
cap = num(df["capital_injections"])

# -- pick entry / exit values per deal --
def pick(series, rank_val, name):
    s = series[df["_rank"] == rank_val]
    out = df.loc[df["_rank"] == rank_val, ["deal_id"]].assign(**{name: s.values})
    return out.groupby("deal_id")[name].first()

xebitda_entry  = pick(xe, 1, "xebitda_entry")
xebitda_exit   = pick(xe, 2, "xebitda_exit")
ebitda_entry   = pick(eb, 1, "ebitda_entry")
ebitda_exit    = pick(eb, 2, "ebitda_exit")
revenue_entry  = pick(rv, 1, "revenue_entry")
revenue_exit   = pick(rv, 2, "revenue_exit")
margin_entry   = pick(mg, 1, "margin_entry")
margin_exit    = pick(mg, 2, "margin_exit")
net_debt_entry = pick(nd, 1, "net_debt_entry")
net_debt_exit  = pick(nd, 2, "net_debt_exit")
div_exit       = pick(div, 2, "div_exit")
cap_exit       = pick(cap, 2, "cap_exit")

# -- driver sanity and required-data mask (drop undefined cores) --
xebitda_entry  = xebitda_entry.where(np.isfinite(xebitda_entry))
xebitda_exit   = xebitda_exit.where(np.isfinite(xebitda_exit))
margin_entry   = margin_entry.where(np.isfinite(margin_entry))

need = pd.Series(True, index=xebitda_entry.index)
need &= xebitda_entry.notna()
need &= xebitda_exit.notna()
need &= margin_entry.notna()
need &= ebitda_entry.notna() & ebitda_exit.notna()
need &= revenue_entry.notna() & revenue_exit.notna()
need &= net_debt_entry.notna() & net_debt_exit.notna()
need &= div_exit.notna() & cap_exit.notna()

if not need.all():
    dropped = int((~need).sum())
    print(f"Dropping {dropped} deal(s) due to undefined core drivers.")
    xebitda_entry  = xebitda_entry[need]
    xebitda_exit   = xebitda_exit[need]
    ebitda_entry   = ebitda_entry[need]
    ebitda_exit    = ebitda_exit[need]
    revenue_entry  = revenue_entry[need]
    revenue_exit   = revenue_exit[need]
    margin_entry   = margin_entry[need]
    margin_exit    = margin_exit[need]
    net_debt_entry = net_debt_entry[need]
    net_debt_exit  = net_debt_exit[need]
    div_exit       = div_exit[need]
    cap_exit       = cap_exit[need]
    survivor_deals = set(need[need].index)
    df = df[df["deal_id"].isin(survivor_deals)].copy()

# -- deltas (exit - entry); deleveraging defined as entry − exit (positive if debt reduced) --
d_multiple = xebitda_exit - xebitda_entry
d_ebitda   = ebitda_exit  - ebitda_entry
d_revenue  = revenue_exit - revenue_entry
d_margin   = margin_exit  - margin_entry
d_debt     = net_debt_entry - net_debt_exit

# -- effects (PBA absolute) --
multiple_effect = d_multiple * ebitda_entry
fcf_effect = d_debt + div_exit + cap_exit
multiple_ebitda_combination_effect = d_multiple * d_ebitda
ebitda_effect = d_ebitda * xebitda_entry
sales_effect = d_revenue * margin_entry * xebitda_entry
margin_effect = d_margin * revenue_entry * xebitda_entry
sales_margin_combination_effect = d_revenue * d_margin * xebitda_entry

# -- reconciliation to equity bridge (diagnostic residual) --
ev_entry = xebitda_entry * ebitda_entry
ev_exit  = xebitda_exit  * ebitda_exit
delta_ev = ev_exit - ev_entry

net_cap_gain_implied = delta_ev + (net_debt_entry - net_debt_exit) + div_exit + cap_exit
sum_effects = multiple_effect + ebitda_effect + multiple_ebitda_combination_effect + fcf_effect
effects_residual = sum_effects - net_cap_gain_implied
effects_residual_rel = effects_residual / net_cap_gain_implied.replace(0, np.nan).abs()

# -- bundle per-deal results and broadcast to both rows --
deal_effects = pd.DataFrame({
    "deal_id": d_multiple.index,
    "multiple_effect": multiple_effect.values,
    "fcf_effect": fcf_effect.values,
    "multiple_ebitda_combination_effect": multiple_ebitda_combination_effect.values,
    "ebitda_effect": ebitda_effect.values,
    "sales_effect": sales_effect.values,
    "margin_effect": margin_effect.values,
    "sales_margin_combination_effect": sales_margin_combination_effect.values,
    "effects_residual": effects_residual.values,
    "effects_residual_rel": effects_residual_rel.fillna(0.0).values,
}).set_index("deal_id")

out = df.drop(columns=["_ref_dt", "_rank"]).merge(
    deal_effects, left_on="deal_id", right_index=True, how="left"
)

# -- persist + checks --
out.to_csv(TARGET_CSV, index=False)

chk = pd.read_csv(TARGET_CSV, dtype={"deal_id": str})
assert chk.groupby("deal_id")["id"].nunique().eq(2).all(), "Row cardinality changed."
req = [
    "multiple_effect","fcf_effect","multiple_ebitda_combination_effect",
    "ebitda_effect","sales_effect","margin_effect","sales_margin_combination_effect",
    "effects_residual","effects_residual_rel"
]
assert set(req).issubset(chk.columns), "Missing effect columns."
assert chk[req].notna().all().all(), "NaNs in effects after merge."

bad_rec = (chk.groupby("deal_id")["effects_residual"].first().abs() > 1e-6) & \
          (chk.groupby("deal_id")["effects_residual_rel"].first().abs() > 1e-6)
if bad_rec.any():
    print(f"Warning: {int(bad_rec.sum())} deal(s) fail effect reconciliation (abs/rel tolerance).")

print("Deal-level effects added:",
      ["multiple_effect","fcf_effect","multiple_ebitda_combination_effect",
       "ebitda_effect","sales_effect","margin_effect","sales_margin_combination_effect",
       "effects_residual","effects_residual_rel"])


Deal-level effects added: ['multiple_effect', 'fcf_effect', 'multiple_ebitda_combination_effect', 'ebitda_effect', 'sales_effect', 'margin_effect', 'sales_margin_combination_effect', 'effects_residual', 'effects_residual_rel']


In [3]:
# === TMU-scaled contributions + pct_of_tm_* columns ===
from pathlib import Path
import pandas as pd
import numpy as np

def find_upwards(rel_path: Path, max_up: int = 8) -> Path:
    here = Path.cwd()
    for parent in [here, *here.parents][: max_up + 1]:
        p = parent / rel_path
        if p.exists():
            return p.resolve()
    raise FileNotFoundError(f"Couldn't locate {rel_path} from {Path.cwd()}")

TARGET_CSV = (find_upwards(Path("ValueCreation")) / "Data" / "working.csv")

# ---- load + guarantees ----
df = pd.read_csv(TARGET_CSV, dtype={"id": str, "deal_id": str})
assert df.groupby("deal_id")["id"].nunique().eq(2).all(), "Each deal_id must have exactly 2 rows."

num = lambda s: pd.to_numeric(s, errors="coerce")

# Absolute effects (currency)
abs_main = ["multiple_effect","fcf_effect","multiple_ebitda_combination_effect","ebitda_effect"]
abs_sub  = ["sales_effect","margin_effect","sales_margin_combination_effect"]
all_abs  = abs_main + abs_sub

IC  = num(df["invested_capital"])
TM  = num(df["times_money"])
TMU = num(df["tm_unlevered"])

# Remove any legacy tm_contrib_* columns if present
tm_cols_legacy = [c for c in df.columns if c.startswith("tm_contrib_")]
if tm_cols_legacy:
    df = df.drop(columns=tm_cols_legacy)

# ---- (1) Compute levered contributions (in memory only) ----
levered_contrib = {}
for c in all_abs:
    levered_contrib[c] = num(df[c]) / IC

# ---- (2) Proportional rescale to TMU ----
eps = 1e-12
valid_tm = TM.abs() > eps
scale = pd.Series(np.nan, index=df.index, dtype=float)
scale[valid_tm] = (TMU / TM)[valid_tm]

for c in all_abs:
    df[f"tmu_contrib_{c}"] = levered_contrib[c] * scale

# Optional: if TM≈0 and TMU≈0, set contributions to 0
zero_tm_but_zero_tmu = (~valid_tm) & (TMU.abs() <= eps)
if zero_tm_but_zero_tmu.any():
    for c in all_abs:
        df.loc[zero_tm_but_zero_tmu, f"tmu_contrib_{c}"] = 0.0

# ---- (3) pct_of_tm_* = (tmu_contrib_* / TM) * 100; guard |TM|~0 ----
for c in all_abs:
    pct_col = f"pct_of_tm_{c}"
    tmu_col = f"tmu_contrib_{c}"
    df[pct_col] = np.where(valid_tm, (df[tmu_col] / TM), np.nan)

# ---- (4) Reorder: place each pct_of_tm_* immediately after its tmu_contrib_* ----
# Build new order while preserving other columns' relative order
cols = list(df.columns)
new_cols = []
seen = set()

for col in cols:
    if col.startswith("tmu_contrib_"):
        base = col.removeprefix("tmu_contrib_")
        pct_col = f"pct_of_tm_{base}"
        new_cols.append(col); seen.add(col)
        if pct_col in df.columns:
            new_cols.append(pct_col); seen.add(pct_col)
    elif col not in seen:
        new_cols.append(col); seen.add(col)

df = df[new_cols]

# ---- (5) QA identities (recompute levered/TMU sums; nothing written from these intermediates) ----
tm_from_contribs = pd.DataFrame({c: levered_contrib[c] for c in abs_main}).sum(axis=1, min_count=1)
diff_tm  = TM  - tm_from_contribs
tol_tm   = np.maximum(1e-9, 0.01 * TM.abs())
ok_tm_row  = (diff_tm.abs() <= tol_tm) & np.isfinite(diff_tm)

tmu_from_contribs = df[[f"tmu_contrib_{c}" for c in abs_main]].sum(axis=1, min_count=1)
diff_tmu = TMU - tmu_from_contribs
tol_tmu  = np.maximum(1e-9, 0.01 * TMU.abs())
ok_tmu_row  = (diff_tmu.abs() <= tol_tmu) & np.isfinite(diff_tmu)

tmu_contrib_ebitda = df["tmu_contrib_ebitda_effect"]
tmu_contrib_subsum = df[[
    "tmu_contrib_sales_effect","tmu_contrib_margin_effect","tmu_contrib_sales_margin_combination_effect"
]].sum(axis=1, min_count=1)
diff_tmu_ebitda = tmu_contrib_ebitda - tmu_contrib_subsum
tol_tmu_ebitda  = np.maximum(1e-9, 0.01 * tmu_contrib_ebitda.abs())
ok_tmu_ebitda_row = (diff_tmu_ebitda.abs() <= tol_tmu_ebitda) & np.isfinite(diff_tmu_ebitda)

ok_tm_deal         = ok_tm_row.groupby(df["deal_id"]).all()
ok_tmu_deal        = ok_tmu_row.groupby(df["deal_id"]).all()
ok_tmu_ebitda_deal = ok_tmu_ebitda_row.groupby(df["deal_id"]).all()

# Persist QA diffs for diagnostics
df["qa_tm_diff"] = diff_tm
df["qa_tmu_diff"] = diff_tmu
df["qa_tmu_ebitda_sub_diff"] = diff_tmu_ebitda

# ---- persist ----
df.to_csv(TARGET_CSV, index=False)

# ---- summaries ----
def summarize(name, mask):
    print(f"{name}: deals within ±1% = {int(mask.sum())} | outside ±1% = {int((~mask).sum())}")

summarize("TM (levered) identity", ok_tm_deal)
summarize("TM_unlevered (sum of tmu_contrib main effects)", ok_tmu_deal)
summarize("EBITDA sub-breakdown on TMU scale", ok_tmu_ebitda_deal)

print("Wrote tmu_contrib_* and pct_of_tm_* (percent of levered TM) columns.")


TM (levered) identity: deals within ±1% = 344 | outside ±1% = 1
TM_unlevered (sum of tmu_contrib main effects): deals within ±1% = 344 | outside ±1% = 1
EBITDA sub-breakdown on TMU scale: deals within ±1% = 345 | outside ±1% = 0
Wrote tmu_contrib_* and pct_of_tm_* (percent of levered TM) columns.
