In [None]:
import time, json
from datetime import datetime, timezone

def utc_now():
    return datetime.now(timezone.utc).isoformat()

try: runId = runId
except NameError: runId = f"manual_{int(time.time())}"

try: workloadName = workloadName
except NameError: workloadName = "unknown"

try: tier = tier
except NameError: tier = "unknown"

try: weight = weight
except NameError: weight = "0"

In [None]:
import time, json
from pyspark.sql import functions as F

start_ts = time.time()
start_utc = utc_now()


n = 40_000_000   


# Two big tables with same join key distribution (forces big shuffle join)
left = (spark.range(0, n)
            .withColumn("k", (F.col("id") % 3_000_000).cast("int"))
            .withColumn("v1", (F.rand(seed=21) * 1000).cast("double"))
            .repartition(400, "k"))  # big shuffle pressure

right = (spark.range(0, n)
             .withColumn("k", (F.col("id") % 3_000_000).cast("int"))
             .withColumn("v2", (F.rand(seed=22) * 1000).cast("double"))
             .repartition(400, "k"))

# Force materialization so the cost is real
_ = left.count()
_ = right.count()

print("Holding executors to simulate long backfill...")
time.sleep(180)  # 3 minutes

# Big shuffle join (NOT broadcast)
j = left.join(right, on="k", how="inner")

# Heavy aggregation to amplify shuffle
heavy = (j.groupBy("k")
          .agg(F.sum("v1").alias("sum_v1"),
               F.sum("v2").alias("sum_v2"),
               F.count("*").alias("cnt"))
          .repartition(200))

rows = heavy.count()  # force execution

duration = round(time.time() - start_ts, 2)
result = {
    "runId": runId, "workloadName": workloadName, "tier": tier, "weight": weight,
    "startTimeUtc": start_utc, "durationSec": duration,
    "metric": {"resultRows": rows, "n": n}
}
print(result)
mssparkutils.notebook.exit(json.dumps(result))
