In [0]:
# Grab secrets from the local scope (do NOT print them)
client_id       = dbutils.secrets.get("local-scope","sp-client-id")
tenant_id       = dbutils.secrets.get("local-scope","sp-tenant-id")
client_secret   = dbutils.secrets.get("local-scope","sp-client-secret")
storage_account = dbutils.secrets.get("local-scope","storage-account-name")






In [0]:
# Configure OAuth (service principal) for ADLS Gen2
spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net",
               "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", client_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", client_secret)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net",
               f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")

In [0]:
raw_path = f"abfss://raw@{storage_account}.dfs.core.windows.net/dataset.csv/"
display(dbutils.fs.ls(raw_path))

In [0]:
# Adjust filename if yours differs
src = raw_path  # or "criteo_uplift.csv"

df_raw = (spark.read
          .option("header","true")
          .option("inferSchema","true")
          .csv(src))

print("Row count:", df_raw.count())
print("Column count:", len(df_raw.columns))
display(df_raw.limit(10))


In [0]:
cols = [c.lower() for c in df_raw.columns]
candidate_labels = [c for c in cols if c in ["conversion","response","visit","label","click"]]
candidate_treat  = [c for c in cols if c in ["treatment","exposed","exposure","test_group"]]
print("Label candidates:", candidate_labels)
print("Treatment candidates:", candidate_treat)


In [0]:
print(df_raw.columns)


In [0]:
LABEL_COL = "conversion"  # <-- change to the actual label name found above
TREAT_COL = "treatment"   # <-- if present; else set to None

from pyspark.sql.functions import col

df = df_raw.select(*df_raw.columns)  # start from raw
if LABEL_COL not in [c.lower() for c in df.columns]:
    raise ValueError("Set LABEL_COL to an existing label column name.")

# Basic EDA: class balance
pos_rate = df.select((col(LABEL_COL).cast("int") == 1).cast("int").alias("pos")).agg({"pos":"avg"}).collect()[0][0]
print(f"Positive rate (approx): {pos_rate:.4f}")


In [0]:
from pyspark.sql.functions import col

# Key columns
LABEL_COL = "conversion"
TREAT_COL = "treatment"

# Define main DataFrame from raw
df = df_raw.select("*")  # or select only required columns if needed

# Optional: quick schema check
df.printSchema()
df.select(LABEL_COL, TREAT_COL).show(5)


In [0]:
from pyspark.sql.functions import rand

TOTAL = 1_000_000
seed = 42

# Add a random key for sampling with a fixed seed
df_rand = df.withColumn("_rand", rand(seed))

# Determine strata columns
strata_cols = [LABEL_COL]
if TREAT_COL and TREAT_COL.lower() in [c.lower() for c in df.columns]:
    strata_cols.append(TREAT_COL)

# For simplicity, take proportional sample within each stratum
# 1) Compute counts per stratum
df_counts = df_rand.groupBy(*strata_cols).count().collect()
total_count = sum(r["count"] for r in df_counts)
frac_global = TOTAL / total_count

# 2) Build a fractions dict for sampleBy (works for single column);
#    For multi-col strata, we’ll filter per stratum.
if len(strata_cols) == 1:
    label_vals = {int(r[strata_cols[0]]): min(1.0, frac_global) for r in df_counts}
    df_sample = df_rand.sampleBy(strata_cols[0], fractions=label_vals, seed=seed)
else:
    # Multi-strata manual approach
    parts = []
    for r in df_counts:
        cond = None
        for c in strata_cols:
            cond = (col(c) == r[c]) if cond is None else (cond & (col(c) == r[c]))
        frac = min(1.0, frac_global)  # proportional
        parts.append(df_rand.where(cond).sample(withReplacement=False, fraction=frac, seed=seed))
    df_sample = parts[0]
    for p in parts[1:]:
        df_sample = df_sample.unionByName(p)

print("Sampled rows (pre-trim):", df_sample.count())

# 3) Trim to exactly 1,000,000 deterministically
from pyspark.sql.window import Window
import pyspark.sql.functions as F

w = Window.orderBy(F.col("_rand"))
df_1m = (df_sample
         .withColumn("_rownum", F.row_number().over(w))
         .where(F.col("_rownum") <= TOTAL)
         .drop("_rownum","_rand"))

print("Final sample:", df_1m.count())
display(df_1m.limit(5))


In [0]:
curated_path = f"abfss://curated@{storage_account}.dfs.core.windows.net/criteo_uplift_1m/"
(df_1m
 .repartition(8)  # small number is fine for 1M rows
 .write.mode("overwrite").parquet(curated_path))

print("Wrote:", curated_path)
