In [0]:
import numpy as np
import pandas as pd
from pyspark.sql import functions as F

# SciPy is commonly available on Databricks runtimes; if not, install it.
# If you prefer, run this in a separate cell:
# %pip install scipy
from scipy import stats  # Welch test + t distribution (ppf/sf)

In [0]:
# 0) Configuration
# -----------------------------
TABLE = "workspace.pilot_class_data.fact_questionnaire_long"

# School grouping rule from your study design
UNDER_SCHOOLS = {"school_b", "school_e"}
RESOURCED_SCHOOLS = {"school_c", "school_d"}

# Analysis scope
Q_MIN, Q_MAX = 1, 12
ALPHA = 0.05              # 95% CI
REQUIRE_COMPLETE_Q1_12 = False   # set True if you want only respondents with all 12 answers

In [0]:
# 1) Load data from Unity Catalog / workspace table
# Why: spark.table is the standard way to access a registered table as a DataFrame.
# -----------------------------
df = spark.table(TABLE)  # SparkSession.table returns a DataFrame 

expected_cols = {"respondent_key", "ai_used", "school_id", "question_num", "score"}
missing = expected_cols - set(df.columns)
if missing:
    raise ValueError(f"Missing required columns: {missing}. Found: {df.columns}")

# Ensure correct types and keep only relevant fields
df = (
    df.select(
        F.col("respondent_key"),
        F.col("ai_used"),
        F.col("school_id"),
        F.col("question_num").cast("int").alias("question_num"),
        F.col("score").cast("double").alias("score"),
    )
)

In [0]:
# 2) Add school_group (Under vs Resourced)
# Why: Gap Reduction / Equity metrics need grouping by school resources.
# -----------------------------
df = (
    df.withColumn(
        "school_group",
        F.when(F.col("school_id").isin(list(UNDER_SCHOOLS)), F.lit("Under"))
         .when(F.col("school_id").isin(list(RESOURCED_SCHOOLS)), F.lit("Resourced"))
         .otherwise(F.lit(None)),
    )
    .filter(F.col("school_group").isNotNull())
)

In [0]:
# 3) Restrict to Q1–Q12 and aggregate to respondent-level mean
# Why: each respondent answered 12 items; using respondent mean avoids pseudo-replication.
# -----------------------------
q12 = df.filter((F.col("question_num").between(Q_MIN, Q_MAX)) & F.col("score").isNotNull())

resp = (
    q12.groupBy("respondent_key", "ai_used", "school_id", "school_group")
       .agg(
           F.avg("score").alias("resp_mean_q1_12"),
           F.countDistinct("question_num").alias("n_answered_q1_12"),
       )
)

if REQUIRE_COMPLETE_Q1_12:
    resp = resp.filter(F.col("n_answered_q1_12") == (Q_MAX - Q_MIN + 1))

# Bring the (small) respondent-level table to pandas for stats
pdf = resp.toPandas()

# Basic sanity
if pdf.empty:
    raise ValueError("No data after filtering. Check question_num range, score nulls, or school_id values.")

In [0]:
# 4) Utility: Welch t-test + CI from summary stats
# Why: Welch handles unequal variances and unequal sample sizes (common in field data).
# SciPy’s ttest_ind(equal_var=False) is Welch’s t-test. 
# -----------------------------
def welch_from_summary(mean_a, var_a, n_a, mean_b, var_b, n_b, alpha=0.05):
    """
    Returns Welch t-test results for (A - B) using summary stats.
    """
    if n_a < 2 or n_b < 2:
        return None

    se = np.sqrt(var_a / n_a + var_b / n_b)

    # Welch–Satterthwaite df for difference of means
    num = (var_a / n_a + var_b / n_b) ** 2
    den = ((var_a / n_a) ** 2) / (n_a - 1) + ((var_b / n_b) ** 2) / (n_b - 1)
    df = num / den

    t = (mean_a - mean_b) / se
    p = 2 * stats.t.sf(np.abs(t), df)  # two-tailed p-value using survival function

    tcrit = stats.t.ppf(1 - alpha / 2, df)
    diff = mean_a - mean_b
    ci_low = diff - tcrit * se
    ci_high = diff + tcrit * se

    return {
        "estimate": diff,
        "t": t,
        "df": df,
        "p": p,
        "se": se,
        "ci_low": ci_low,
        "ci_high": ci_high,
    }


In [0]:
# 5) Utility: Welch-style test for a linear contrast of 4 independent means
# Why: Gap Reduction / Equity DiD is a 4-cell contrast of means; variance adds across cells.
# Welch–Satterthwaite equation gives effective df for sum of variance terms. 
# -----------------------------
def welch_linear_contrast(groups, weights, alpha=0.05):
    """
    groups: list of dicts with keys {mean, var, n}
    weights: same length; contrast estimate = sum_i w_i * mean_i

    Variance term per group: v_i = (w_i^2) * var_i / n_i
    df approx: (sum v_i)^2 / sum (v_i^2 / (n_i - 1))  (Welch–Satterthwaite)
    """
    if len(groups) != len(weights):
        raise ValueError("groups and weights must have same length")

    # Must have >=2 per group to estimate variance
    if any(g["n"] < 2 for g in groups):
        return None

    estimate = sum(w * g["mean"] for w, g in zip(weights, groups))

    v_terms = [ (w**2) * g["var"] / g["n"] for w, g in zip(weights, groups) ]
    se = np.sqrt(sum(v_terms))

    num = (sum(v_terms))**2
    den = sum((v**2) / (g["n"] - 1) for v, g in zip(v_terms, groups))
    df = num / den

    t = estimate / se
    p = 2 * stats.t.sf(np.abs(t), df)

    tcrit = stats.t.ppf(1 - alpha / 2, df)
    ci_low = estimate - tcrit * se
    ci_high = estimate + tcrit * se

    return {
        "estimate": estimate,
        "t": t,
        "df": df,
        "p": p,
        "se": se,
        "ci_low": ci_low,
        "ci_high": ci_high,
    }

In [0]:
# 6) Compute Experience Lift (overall): AI - Baseline
# Why: this is your headline effect on learning experience (Q1–Q12).
# -----------------------------
def summary(series: pd.Series):
    s = series.dropna().astype(float)
    return {"n": int(s.shape[0]), "mean": float(s.mean()), "var": float(s.var(ddof=1))}

sY = summary(pdf.loc[pdf["ai_used"] == "Y", "resp_mean_q1_12"])
sN = summary(pdf.loc[pdf["ai_used"] == "N", "resp_mean_q1_12"])

exp_lift = welch_from_summary(
    mean_a=sY["mean"], var_a=sY["var"], n_a=sY["n"],
    mean_b=sN["mean"], var_b=sN["var"], n_b=sN["n"],
    alpha=ALPHA
)


In [0]:
# 7) Compute Equity metrics: Gap Reduction and Equity DiD (absolute)
# Define cell means:
#   UY = Under + AI, UN = Under + NoAI, RY = Resourced + AI, RN = Resourced + NoAI
#
# Equity Gap (Baseline) = RN - UN
# Equity Gap (AI)       = RY - UY
# Gap Reduction         = (RN - UN) - (RY - UY)
#
# Equity DiD (Under lift - Resourced lift)
# = (UY - UN) - (RY - RN) = same algebraically as Gap Reduction (abs)
# -----------------------------
cell = (
    pdf.groupby(["ai_used", "school_group"])["resp_mean_q1_12"]
       .agg(["count", "mean", lambda x: x.var(ddof=1)])
       .rename(columns={"<lambda_0>": "var"})
)

def get_cell(ai_used, grp):
    row = cell.loc[(ai_used, grp)]
    return {"n": int(row["count"]), "mean": float(row["mean"]), "var": float(row["var"])}

RN = get_cell("N", "Resourced")
UN = get_cell("N", "Under")
RY = get_cell("Y", "Resourced")
UY = get_cell("Y", "Under")

gap_baseline = RN["mean"] - UN["mean"]
gap_ai = RY["mean"] - UY["mean"]
gap_reduction_abs = gap_baseline - gap_ai
gap_reduction_pct = (gap_reduction_abs / gap_baseline) if gap_baseline != 0 else np.nan

# Contrast weights for Gap Reduction / Equity DiD:
# (RN - UN) - (RY - UY)  => +1*RN + (-1)*UN + (-1)*RY + (+1)*UY
contrast_groups = [RN, UN, RY, UY]
contrast_weights = [1, -1, -1, 1]

gap_reduction_test = welch_linear_contrast(contrast_groups, contrast_weights, alpha=ALPHA)

In [0]:
# 8) Bootstrap CI for Gap Reduction (abs and %)
# Why: Likert-ish data can be non-normal; bootstrap is a good sensitivity check.
# -----------------------------
def bootstrap_gap_reduction(pdf_resp, n_boot=5000, seed=42):
    rng = np.random.default_rng(seed)

    uy = pdf_resp[(pdf_resp.ai_used=="Y") & (pdf_resp.school_group=="Under")]["resp_mean_q1_12"].to_numpy()
    un = pdf_resp[(pdf_resp.ai_used=="N") & (pdf_resp.school_group=="Under")]["resp_mean_q1_12"].to_numpy()
    ry = pdf_resp[(pdf_resp.ai_used=="Y") & (pdf_resp.school_group=="Resourced")]["resp_mean_q1_12"].to_numpy()
    rn = pdf_resp[(pdf_resp.ai_used=="N") & (pdf_resp.school_group=="Resourced")]["resp_mean_q1_12"].to_numpy()

    out_abs = np.empty(n_boot, dtype=float)
    out_pct = np.empty(n_boot, dtype=float)

    for i in range(n_boot):
        mu_rn = rng.choice(rn, size=rn.size, replace=True).mean()
        mu_un = rng.choice(un, size=un.size, replace=True).mean()
        mu_ry = rng.choice(ry, size=ry.size, replace=True).mean()
        mu_uy = rng.choice(uy, size=uy.size, replace=True).mean()

        gb = mu_rn - mu_un
        ga = mu_ry - mu_uy
        gr = gb - ga

        out_abs[i] = gr
        out_pct[i] = gr / gb if gb != 0 else np.nan

    # Percentile CI
    ci_abs = np.nanpercentile(out_abs, [2.5, 97.5])
    ci_pct = np.nanpercentile(out_pct, [2.5, 97.5])

    # Two-sided bootstrap p-value: proportion of bootstrap distribution on the "wrong" side
    p_abs = 2 * min(np.mean(out_abs <= 0), np.mean(out_abs >= 0))

    return {
        "boot_ci_low_abs": float(ci_abs[0]),
        "boot_ci_high_abs": float(ci_abs[1]),
        "boot_p_two_sided_abs": float(p_abs),
        "boot_ci_low_pct": float(ci_pct[0]),
        "boot_ci_high_pct": float(ci_pct[1]),
    }

boot = bootstrap_gap_reduction(pdf, n_boot=5000, seed=42)

In [0]:
# 9) Present results
# -----------------------------
results = []

results.append({
    "metric": "Experience Lift (AI - Baseline)",
    "estimate": exp_lift["estimate"],
    "t": exp_lift["t"],
    "df": exp_lift["df"],
    "p": exp_lift["p"],
    "ci_low": exp_lift["ci_low"],
    "ci_high": exp_lift["ci_high"],
    "n_A": sY["n"],
    "n_B": sN["n"],
})

results.append({
    "metric": "Equity Gap (Baseline) = RN - UN",
    "estimate": gap_baseline,
    "t": np.nan,
    "df": np.nan,
    "p": np.nan,
    "ci_low": np.nan,
    "ci_high": np.nan,
    "n_A": RN["n"],
    "n_B": UN["n"],
})

results.append({
    "metric": "Equity Gap (AI) = RY - UY",
    "estimate": gap_ai,
    "t": np.nan,
    "df": np.nan,
    "p": np.nan,
    "ci_low": np.nan,
    "ci_high": np.nan,
    "n_A": RY["n"],
    "n_B": UY["n"],
})

results.append({
    "metric": "Gap Reduction (abs) = (RN-UN) - (RY-UY)",
    "estimate": gap_reduction_test["estimate"],
    "t": gap_reduction_test["t"],
    "df": gap_reduction_test["df"],
    "p": gap_reduction_test["p"],
    "ci_low": gap_reduction_test["ci_low"],
    "ci_high": gap_reduction_test["ci_high"],
    "n_A": RN["n"] + UN["n"] + RY["n"] + UY["n"],
    "n_B": np.nan,
})

results.append({
    "metric": "Gap Reduction (%) = GapReductionAbs / GapBaseline",
    "estimate": gap_reduction_pct,
    "t": np.nan,
    "df": np.nan,
    "p": np.nan,
    "ci_low": np.nan,
    "ci_high": np.nan,
    "n_A": np.nan,
    "n_B": np.nan,
})


out = pd.DataFrame(results)

# Nicely formatted view
pd.set_option("display.float_format", lambda x: f"{x:0.4f}" if pd.notnull(x) else "NaN")
display(out)

print("\nBootstrap robustness check (Gap Reduction):")
display(pd.DataFrame([boot]))