In [7]:
import pandas as pd
import numpy as np

# ---------- Plan catalog ----------
plans = pd.DataFrame({
    "plan": ["Basic","Standard","Plus","Pro","Ultra"],
    "base_fee": [25.0, 40.0, 55.0, 70.0, 85.0],
    "min_included": [300, 600, 800, 1000, 1200],
    "data_included": [3.0, 6.0, 8.0, 10.0, 12.0],
    "min_overage_rate": [0.08, 0.05, 0.04, 0.03, 0.02],
    "data_overage_rate": [8.0, 6.0, 5.0, 4.0, 3.5],
})

# ---------- Usage facts ----------
n = 100_000  # >= 100k as required
rng = np.random.default_rng(42)

plan_probs = [0.35, 0.35, 0.15, 0.10, 0.05]  # skew toward Basic/Standard
region_vals = np.array(["West","East","South","North","Central"])
region_probs = [0.25, 0.25, 0.20, 0.20, 0.10]

df = pd.DataFrame({
    "customer_id": np.arange(1, n+1, dtype=np.int64),
    "plan": rng.choice(plans["plan"].values, size=n, p=plan_probs),
    "minutes_used": np.clip(rng.normal(loc=600, scale=200, size=n).round().astype(np.int64), 0, None),
    "data_gb": np.clip(rng.normal(loc=8.0, scale=4.0, size=n), 0, None).astype(float),
    "is_active": rng.random(n) < 0.85,
    "region": rng.choice(region_vals, size=n, p=region_probs),
})

# Sanity checks (optional)
assert set(df["plan"].unique()).issubset(set(plans["plan"].unique()))
assert df.shape[0] >= 100_000


In [8]:
from time import perf_counter

# Build a plan lookup dict for row-wise access (still "old" since we loop via apply)
plan_lookup = plans.set_index("plan")[[
    "base_fee","min_included","data_included","min_overage_rate","data_overage_rate"
]].to_dict("index")

def compute_monthly_bill_row(row):
    p = row["plan"]
    attrs = plan_lookup[p]
    base = attrs["base_fee"]
    min_inc = attrs["min_included"]
    dat_inc = attrs["data_included"]
    min_rate = attrs["min_overage_rate"]
    dat_rate = attrs["data_overage_rate"]

    min_over = max(row["minutes_used"] - min_inc, 0)
    dat_over = max(row["data_gb"] - dat_inc, 0.0)
    return base + min_over * min_rate + dat_over * dat_rate

# Baseline timing
t0 = perf_counter()

df_old = df.copy()  # operate on full data (no predicate pushdown)
df_old["monthly_bill"] = df_old.apply(compute_monthly_bill_row, axis=1)

# NOW filter (intentionally bad ordering for performance)
df_old = df_old[(df_old["is_active"]) & (df_old["region"] == "West")]

# Compute region total the slow way and merge back (not transform)
region_totals = (
    df_old.groupby("region", as_index=False)["monthly_bill"]
          .sum()
          .rename(columns={"monthly_bill": "region_bill_total"})
)
df_old = df_old.merge(region_totals, on="region", how="left")
df_old["customer_share"] = df_old["monthly_bill"] / df_old["region_bill_total"]

t_old = perf_counter() - t0

print(f"Old pipeline input rows (pre-filter): {len(df):,}")
print(f"Old pipeline rows after filter:      {len(df_old):,}")
print(f"Old pipeline runtime:                {t_old:.3f} s")


Old pipeline input rows (pre-filter): 100,000
Old pipeline rows after filter:      21,389
Old pipeline runtime:                0.433 s


In [61]:
from time import perf_counter

t0 = perf_counter()

mask = df["is_active"] & (df["region"] == "West")
df_new = df.loc[mask].copy()

plan_cols = plans[['plan', 'base_fee', 'min_included', 'data_included', 'min_overage_rate', 'data_overage_rate']]

df_new = df_new.merge(plan_cols, on='plan', how='left', validate="m:1", indicator=True)

# 1) Inspect join result
# print(df_new["_merge"].value_counts(dropna=False))
# results were clean, only on both, 0 on left or right

# 2) Hard-fail if any unmatched usage rows
if (df_new["_merge"] != "both").any():
    missing_keys = df_new.loc[df_new["_merge"] == "left_only", "plan"].unique()
    raise ValueError(f"Unmatched plan keys in usage: {missing_keys}")

req = ["base_fee","min_included","data_included","min_overage_rate","data_overage_rate"]

bad = df_new[req].isna().any(axis=1)
if bad.any():
    # Diagnostics
    print("Nulls by column:\n", df_new.loc[bad, req].isna().sum())
    bad_plans = df_new.loc[bad, "plan"].unique()
    raise ValueError(f"Joined attributes contain NaN for plans: {bad_plans}")

df_new = df_new.drop(columns="_merge")



# Ensure usage columns are numeric too
df_new["minutes_used"] = pd.to_numeric(df_new["minutes_used"], errors="raise")
df_new["data_gb"]      = pd.to_numeric(df_new["data_gb"], errors="raise")

df_new = df_new.astype({
    "base_fee": "float64",
    "min_included": "int64",           
    "data_included": "float64",
    "min_overage_rate": "float64",
    "data_overage_rate": "float64",
})


# vectorize after a merge on plan
df_new['monthly_bill'] = df_new['base_fee'] + (df_new['minutes_used'] - df_new['min_included']).clip(lower=0) * df_new['min_overage_rate'] + (df_new['data_gb'] - df_new['data_included']).clip(lower=0) * df_new['data_overage_rate']


# Per-region total via transform (no extra merge/materialization)
df_new["region_bill_total"] = df_new.groupby("region")["monthly_bill"].transform("sum")
df_new["customer_share"] = df_new["monthly_bill"] / df_new["region_bill_total"]

assert (df_new["monthly_bill"].notna()).all(), "monthly_bill has NaNs"
assert (df_new["monthly_bill"] >= df_new["base_fee"]).all(), "bill < base_fee found"

t_new = perf_counter() - t0

print(f"New pipeline input rows (pre-filter): {len(df):,}")
print(f"New pipeline rows after filter:      {len(df_new):,}")
print(f"New pipeline runtime:                {t_new:.3f} s")


New pipeline input rows (pre-filter): 100,000
New pipeline rows after filter:      21,389
New pipeline runtime:                0.037 s


In [69]:
import numpy as np
import pandas as pd
from time import perf_counter

# --- 0) Prereqs: row-wise function (baseline uses it) -------------------------
# If you already defined plan_lookup and compute_monthly_bill_row, reuse them.
plan_lookup = plans.set_index("plan")[[
    "base_fee","min_included","data_included","min_overage_rate","data_overage_rate"
]].to_dict("index")

def compute_monthly_bill_row(row):
    p = row["plan"]
    attrs = plan_lookup[p]
    base = attrs["base_fee"]
    min_inc = attrs["min_included"]
    dat_inc = attrs["data_included"]
    min_rate = attrs["min_overage_rate"]
    dat_rate = attrs["data_overage_rate"]
    min_over = max(row["minutes_used"] - min_inc, 0)
    dat_over = max(row["data_gb"] - dat_inc, 0.0)
    return base + min_over * min_rate + dat_over * dat_rate

# --- 1) Region -> category + memory effect -----------------------------------

# --- 1) Region -> category + memory effect (applied to real df) ---
mem_before_frame  = df.memory_usage(deep=True).sum()
mem_before_region = df["region"].memory_usage(deep=True)

df = df.copy()
df["region"] = df["region"].astype("category")

mem_after_frame   = df.memory_usage(deep=True).sum()
mem_after_region  = df["region"].memory_usage(deep=True)

print("Categorical cast (region) applied to shared df:")
print(f"  region bytes  before: {mem_before_region:,}")
print(f"  region bytes   after: {mem_after_region:,}")
print(f"  frame  bytes  before: {mem_before_frame:,}")
print(f"  frame  bytes   after: {mem_after_frame:,}")
print(f"  region savings: {mem_before_region - mem_after_region:,} bytes")
print(f"  frame  savings: {mem_before_frame  - mem_after_frame:,} bytes")



# --- 2) Pipelines -------------------------------------------------------------

def pipeline_old_apply_then_filter(df):
    t0 = perf_counter()
    df_old = df.copy()
    df_old["monthly_bill"] = df_old.apply(compute_monthly_bill_row, axis=1)
    df_old = df_old[(df_old["is_active"]) & (df_old["region"] == "West")].copy()
    region_totals = (df_old.groupby("region", as_index=False)["monthly_bill"]
                           .sum()
                           .rename(columns={"monthly_bill": "region_bill_total"}))
    df_old = df_old.merge(region_totals, on="region", how="left")
    df_old["customer_share"] = df_old["monthly_bill"] / df_old["region_bill_total"]
    t = perf_counter() - t0
    return df_old, t

def pipeline_pushdown_then_apply(df):
    t0 = perf_counter()
    df2 = df.loc[df["is_active"] & (df["region"] == "West")].copy()
    df2["monthly_bill"] = df2.apply(compute_monthly_bill_row, axis=1)
    df2["region_bill_total"] = df2.groupby("region")["monthly_bill"].transform("sum")
    df2["customer_share"] = df2["monthly_bill"] / df2["region_bill_total"]
    t = perf_counter() - t0
    return df2, t

def pipeline_vectorized(df, plans):
    t0 = perf_counter()
    # filter early
    base = df.loc[df["is_active"] & (df["region"] == "West"),
                  ["region","plan","minutes_used","data_gb","is_active"]].copy()

    # minimal plan columns, constrained merge + indicator for diagnostics
    plan_cols = plans[["plan","base_fee","min_included","data_included",
                       "min_overage_rate","data_overage_rate"]]
    out = base.merge(plan_cols, on="plan", how="left", validate="m:1", indicator=True)

    # join diagnostics
    merge_counts = out["_merge"].value_counts(dropna=False).to_dict()
    if (out["_merge"] != "both").any():
        bad_keys = out.loc[out["_merge"]=="left_only","plan"].unique()
        raise ValueError(f"Unmatched plan keys in usage: {bad_keys}")
    out = out.drop(columns="_merge")

    # required columns present
    req = ["base_fee","min_included","data_included","min_overage_rate","data_overage_rate"]
    bad = out[req].isna().any(axis=1)
    if bad.any():
        bad_plans = out.loc[bad,"plan"].unique()
        raise ValueError(f"Joined attributes contain NaN for plans: {bad_plans}")

    # numeric dtypes
    out["minutes_used"] = pd.to_numeric(out["minutes_used"], errors="raise")
    out["data_gb"]      = pd.to_numeric(out["data_gb"], errors="raise")
    out = out.astype({
        "base_fee": "float64",
        "min_included": "int64",
        "data_included": "float64",
        "min_overage_rate": "float64",
        "data_overage_rate": "float64",
    })

    # vectorized overage math
    min_over  = (out["minutes_used"] - out["min_included"]).clip(lower=0)
    data_over = (out["data_gb"]      - out["data_included"]).clip(lower=0)
    out["monthly_bill"] = (out["base_fee"]
                           + min_over  * out["min_overage_rate"]
                           + data_over * out["data_overage_rate"])

    # region totals + share with zero-guard
    out["region_bill_total"] = out.groupby("region")["monthly_bill"].transform("sum")
    zero_tot = out["region_bill_total"] == 0
    out["customer_share"] = np.where(zero_tot, 0.0, out["monthly_bill"] / out["region_bill_total"])

    # invariants
    assert out["monthly_bill"].notna().all()
    assert (out["monthly_bill"] >= out["base_fee"]).all()

    t = perf_counter() - t0
    return out, t, merge_counts

# --- 3) Run all on same input size (>=100k) -----------------------------------
# Ensure df has >=100k rows; if not, your numbers will differ from your target scale.

input_rows = len(df)
old_df,  t_old  = pipeline_old_apply_then_filter(df)
push_df, t_push = pipeline_pushdown_then_apply(df)
vec_df,  t_vec, merge_counts = pipeline_vectorized(df, plans)

# --- 4) Benchmark table + speedups -------------------------------------------
def fmt(s): 
    return f"{s:,.3f} s"

print("\n=== Benchmark (same input) ===")
print(f"Rows in:         {input_rows:,}")
print(f"Rows post-filter (old):  {len(old_df):,}")
print(f"Rows post-filter (push): {len(push_df):,}")
print(f"Rows post-filter (vec):  {len(vec_df):,}")
print("\nRuntimes:")
print(f"  Old baseline (apply then filter): {fmt(t_old)}")
print(f"  Pushdown + apply:                  {fmt(t_push)}")
print(f"  Vectorized:                        {fmt(t_vec)}")

print("\nSpeedups (higher is better):")
print(f"  Old / Vectorized:       {t_old / t_vec:,.2f}×")
print(f"  Push+Apply / Vectorized:{t_push / t_vec:,.2f}×")

print("\n_merge status (vectorized):", merge_counts)  # expect {'both': post-filter rows}

# --- 5) Equivalence spot-check (apply vs vectorized on a sample) -------------
# Sample 10–20 rows from the *vectorized* result and recompute the apply math for those same rows.
n = min(20, len(vec_df))
sample = vec_df.sample(n=n, random_state=42)

# Rebuild a small frame with the columns the row-function expects
check = sample[["plan","minutes_used","data_gb"]].copy()
# The row-wise function uses plan_lookup dict built from `plans` above
apply_vals = check.apply(compute_monthly_bill_row, axis=1).values
vec_vals   = sample["monthly_bill"].values

equal_mask = np.isclose(apply_vals, vec_vals, rtol=0, atol=1e-9)
print(f"\nEquivalence spot-check on {n} rows: {equal_mask.sum()}/{n} exact within 1e-9")
if not equal_mask.all():
    diffs = pd.DataFrame({
        "plan": sample["plan"].values,
        "apply": apply_vals,
        "vectorized": vec_vals,
        "delta": vec_vals - apply_vals
    })
    print("Mismatches (head):")
    print(diffs.loc[~equal_mask].head(5))


Categorical cast (region) applied to shared df:
  region bytes  before: 100,614
  region bytes   after: 100,614
  frame  bytes  before: 8,871,014
  frame  bytes   after: 8,871,014
  region savings: 0 bytes
  frame  savings: 0 bytes

=== Benchmark (same input) ===
Rows in:         100,000
Rows post-filter (old):  21,389
Rows post-filter (push): 21,389
Rows post-filter (vec):  21,389

Runtimes:
  Old baseline (apply then filter): 0.412 s
  Pushdown + apply:                  0.089 s
  Vectorized:                        0.009 s

Speedups (higher is better):
  Old / Vectorized:       46.22×
  Push+Apply / Vectorized:10.03×

_merge status (vectorized): {'both': 21389, 'left_only': 0, 'right_only': 0}

Equivalence spot-check on 20 rows: 20/20 exact within 1e-9


  region_totals = (df_old.groupby("region", as_index=False)["monthly_bill"]
  df2["region_bill_total"] = df2.groupby("region")["monthly_bill"].transform("sum")
  out["region_bill_total"] = out.groupby("region")["monthly_bill"].transform("sum")
