# UDF Distribution Test: Git Repo Code to Workers

## Test Plan:
1. **Test 1:** Show that imports from Git Repo FAIL on workers (without addPyFile)
2. **Test 2:** Fix with addPyFile and show it works
3. **Test 3:** Monitor which nodes execute the UDFs

**Prerequisites:**
- Cluster with at least 1 driver + 1 worker
- Git Repo with `taxi_udfs/calculations.py` module

**Update REPO_PATH below to match your workspace path!**

## Configuration - UPDATE THIS PATH!

In [0]:
# ============================================================
# UPDATE THIS PATH TO YOUR REPO LOCATION IN DATABRICKS
# ============================================================
REPO_PATH = "/Workspace/Users/sarbani.maiti@databricks.com/dab-mlops-demo-4dec/mlops-udf-demo"

print(f"Using repo path: {REPO_PATH}")

📁 Using repo path: /Workspace/Users/sarbani.maiti@databricks.com/dab-mlops-demo-4dec/mlops-udf-demo


## Step 1: Cluster & Environment Info

In [0]:
import socket
import os

# Print cluster info
print("=" * 60)
print("CLUSTER INFORMATION")
print("=" * 60)
print(f"Driver Hostname: {socket.gethostname()}")
print(f"Spark Version: {spark.version}")

# Get executor count (subtract 1 for driver)
try:
    num_executors = spark.sparkContext._jsc.sc().getExecutorMemoryStatus().size() - 1
    print(f"Number of Workers: {num_executors}")
except:
    print("Number of Workers: Unable to determine")

print(f"Default Parallelism: {spark.sparkContext.defaultParallelism}")
print(f"Application ID: {spark.sparkContext.applicationId}")
print("=" * 60)

# Store driver hostname for later comparison
DRIVER_HOSTNAME = socket.gethostname()

CLUSTER INFORMATION
Driver Hostname: 1218-095811-m484hsz3-10-0-9-182
Spark Version: 3.5.2
Number of Workers: 2
Default Parallelism: 8
Application ID: app-20251210073449-0000


## Step 2: Create Test Data (Force Distribution to Workers)

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, BooleanType

# Create enough data to force distribution across workers
# 10K rows ensures data is spread across partitions/workers
data = [(i, float(i % 50 + 5), float((i % 100) + 10), float(i % 20), i % 24, i % 2 == 0) 
        for i in range(10000)]

schema = StructType([
    StructField("trip_id", IntegerType(), False),
    StructField("distance", DoubleType(), False),
    StructField("fare", DoubleType(), False),
    StructField("tip", DoubleType(), False),
    StructField("hour", IntegerType(), False),
    StructField("is_weekend", BooleanType(), False),
])

# Repartition to ensure parallel execution across workers
num_partitions = max(spark.sparkContext.defaultParallelism * 2, 8)
test_df = spark.createDataFrame(data, schema).repartition(num_partitions)

print(f"✅ Created DataFrame with {test_df.count()} rows")
print(f"✅ Partitioned into {test_df.rdd.getNumPartitions()} partitions")
print(f"✅ Data will be processed across worker nodes")

# Cache for repeated tests
test_df.cache()
test_df.count()  # Force cache materialization

✅ Created DataFrame with 10000 rows
✅ Partitioned into 16 partitions
✅ Data will be processed across worker nodes


10000

---
## Fix with spark.sparkContext.addPyFile() using ZIP

This test shows how to properly distribute Git Repo code to workers.


 **ZIP the module** and add the zip file.

In [0]:
import zipfile
import os

# Create a zip file containing the taxi_udfs module
ZIP_PATH = "/tmp/taxi_udfs.zip"

print("🔄 Creating zip file of taxi_udfs module...")
print(f"   Source: {REPO_PATH}/taxi_udfs/")
print(f"   Destination: {ZIP_PATH}")
print("")

# Create the zip file
with zipfile.ZipFile(ZIP_PATH, 'w', zipfile.ZIP_DEFLATED) as zipf:
    # Add __init__.py
    init_path = f"{REPO_PATH}/taxi_udfs/__init__.py"
    zipf.write(init_path, "taxi_udfs/__init__.py")
    print(f"   Added: taxi_udfs/__init__.py")
    
    # Add calculations.py
    calc_path = f"{REPO_PATH}/taxi_udfs/calculations.py"
    zipf.write(calc_path, "taxi_udfs/calculations.py")
    print(f"   Added: taxi_udfs/calculations.py")

print("")
print(f" Created zip file: {ZIP_PATH}")

# List contents of zip to verify
with zipfile.ZipFile(ZIP_PATH, 'r') as zipf:
    print(f"   Contents: {zipf.namelist()}")

🔄 Creating zip file of taxi_udfs module...
   Source: /Workspace/Users/sarbani.maiti@databricks.com/dab-mlops-demo-4dec/mlops-udf-demo/taxi_udfs/
   Destination: /tmp/taxi_udfs.zip

   Added: taxi_udfs/__init__.py
   Added: taxi_udfs/calculations.py

✅ Created zip file: /tmp/taxi_udfs.zip
   Contents: ['taxi_udfs/__init__.py', 'taxi_udfs/calculations.py']


#### Add the ZIP file to Spark context using addPyFile() - this distributes to ALL workers

In [0]:
# Add the ZIP file to Spark context - this distributes to ALL workers
spark.sparkContext.addPyFile(ZIP_PATH)

print("✅ Added zip file to Spark context")
print("✅ Module will be distributed to all worker nodes")
print("")
print("📝 Workers can now import using: from taxi_udfs.calculations import ...")

✅ Added zip file to Spark context
✅ Module will be distributed to all worker nodes

📝 Workers can now import using: from taxi_udfs.calculations import ...


### Define UDFs to  work on workers

In [0]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import FloatType, StringType
import socket
import os

# UDF 1: Calculate fare per mile
@udf(returnType=FloatType())
def fare_per_mile_udf(fare, distance):
    """
    Calculate fare per mile using the distributed module.
    Since we zipped with package structure (taxi_udfs/), 
    we import using the full package path.
    """
    from taxi_udfs.calculations import calculate_fare_per_mile
    return calculate_fare_per_mile(fare, distance)

# UDF 2: Categorize fare
@udf(returnType=StringType())
def categorize_fare_udf(fare):
    """Categorize fare amount into price tiers."""
    from taxi_udfs.calculations import categorize_fare_amount
    return categorize_fare_amount(fare)

# UDF 3: Calculate tip percentage
@udf(returnType=FloatType())
def tip_percentage_udf(tip, fare):
    """Calculate tip as percentage of fare."""
    from taxi_udfs.calculations import calculate_tip_percentage
    return calculate_tip_percentage(tip, fare)

# UDF 4: Track execution location (which worker runs this)
@udf(returnType=StringType())
def get_execution_location():
    """
    Returns information about WHERE this UDF is executing.
    This helps verify that UDFs run on workers, not just the driver.
    """
    hostname = socket.gethostname()
    pid = os.getpid()
    executor_id = os.environ.get("SPARK_EXECUTOR_ID", "driver")
    return f"host={hostname}|pid={pid}|executor={executor_id}"

print("✅ UDFs defined successfully with ZIP + addPyFile fix")
print("")
print("📝 Key insight:")
print("   - We zipped the module WITH its package structure (taxi_udfs/)")
print("   - So we can use the full import: from taxi_udfs.calculations import ...")
print("   - This matches how it works on the driver!")

✅ UDFs defined successfully with ZIP + addPyFile fix

📝 Key insight:
   - We zipped the module WITH its package structure (taxi_udfs/)
   - So we can use the full import: from taxi_udfs.calculations import ...
   - This matches how it works on the driver!


### Execute the UDFs - THIS SHOULD WORK NOW!

In [0]:
print("🔄 Executing UDFs on workers...")
print("")

result_df = test_df.select(
    col("trip_id"),
    col("fare"),
    col("distance"),
    col("tip"),
    fare_per_mile_udf(col("fare"), col("distance")).alias("fare_per_mile"),
    categorize_fare_udf(col("fare")).alias("fare_category"),
    tip_percentage_udf(col("tip"), col("fare")).alias("tip_pct"),
    get_execution_location().alias("executed_on")
)

# Force execution with collect()
results = result_df.collect()

print(f"✅ SUCCESS! Processed {len(results)} rows")
print("")
print("📊 Sample Results (first 10 rows):")
print("-" * 100)
print(f"{'Trip ID':<10} {'Fare':>10} {'Distance':>10} {'$/Mile':>10} {'Category':<12} {'Tip%':>8} {'Executed On'}")
print("-" * 100)

for row in results[:10]:
    print(f"{row.trip_id:<10} ${row.fare:>8.2f} {row.distance:>10.1f} ${row.fare_per_mile:>8.2f} {row.fare_category:<12} {row.tip_pct:>7.1f}% {row.executed_on}")

print("-" * 100)

🔄 Executing UDFs on workers...

✅ SUCCESS! Processed 10000 rows

📊 Sample Results (first 10 rows):
----------------------------------------------------------------------------------------------------
Trip ID          Fare   Distance     $/Mile Category         Tip% Executed On
----------------------------------------------------------------------------------------------------
1814       $   24.00       19.0 $    1.26 expensive       58.3% host=1218-095811-m484hsz3-10-0-11-143|pid=11710|executor=driver
2029       $   39.00       34.0 $    1.15 expensive       23.1% host=1218-095811-m484hsz3-10-0-11-143|pid=11710|executor=driver
1675       $   85.00       30.0 $    2.83 premium         17.6% host=1218-095811-m484hsz3-10-0-11-143|pid=11710|executor=driver
1321       $   31.00       26.0 $    1.19 expensive        3.2% host=1218-095811-m484hsz3-10-0-11-143|pid=11710|executor=driver
1127       $   37.00       32.0 $    1.16 expensive       18.9% host=1218-095811-m484hsz3-10-0-11-143|pid=117

---
## TEST 3: Monitor Which Workers Executed the UDFs

In [0]:
# Analyze execution distribution across workers
print("=" * 70)
print("📊 UDF EXECUTION DISTRIBUTION ACROSS NODES")
print("=" * 70)
print("")

# Group by execution location to see distribution
execution_stats = result_df.groupBy("executed_on").count().orderBy("count", ascending=False)
execution_stats.show(truncate=False)

# Get detailed stats
unique_executors = [row.executed_on for row in execution_stats.collect()]
total_tasks = sum(row["count"] for row in execution_stats.collect())

print(f"📈 Execution Statistics:")
print(f"   Total rows processed: {total_tasks}")
print(f"   Unique executors used: {len(unique_executors)}")
print("")

print("📍 Executor Details:")
for exec_info in unique_executors:
    # Parse executor info
    parts = dict(p.split("=") for p in exec_info.split("|"))
    print(f"   • Hostname: {parts.get('host', 'N/A')}")
    print(f"     PID: {parts.get('pid', 'N/A')}, Executor ID: {parts.get('executor', 'N/A')}")
    print("")

📊 UDF EXECUTION DISTRIBUTION ACROSS NODES

+---------------------------------------------------------------+-----+
|executed_on                                                    |count|
+---------------------------------------------------------------+-----+
|host=1218-095811-m484hsz3-10-0-0-109|pid=12352|executor=driver |1250 |
|host=1218-095811-m484hsz3-10-0-0-109|pid=12350|executor=driver |1250 |
|host=1218-095811-m484hsz3-10-0-0-109|pid=12335|executor=driver |1250 |
|host=1218-095811-m484hsz3-10-0-0-109|pid=12344|executor=driver |1250 |
|host=1218-095811-m484hsz3-10-0-11-143|pid=12618|executor=driver|1250 |
|host=1218-095811-m484hsz3-10-0-11-143|pid=12620|executor=driver|1250 |
|host=1218-095811-m484hsz3-10-0-11-143|pid=12614|executor=driver|1250 |
|host=1218-095811-m484hsz3-10-0-11-143|pid=12615|executor=driver|1250 |
+---------------------------------------------------------------+-----+

📈 Execution Statistics:
   Total rows processed: 10000
   Unique executors used: 8

📍 Execut

In [0]:
# Check if execution happened on workers (not just driver)
print("=" * 70)
print("🎯 WORKER EXECUTION VERIFICATION")
print("=" * 70)
print("")
print(f"Driver hostname: {DRIVER_HOSTNAME}")
print("")

# Count executions on driver vs workers
driver_count = 0
worker_count = 0
worker_hosts = set()

for row in execution_stats.collect():
    exec_info = row.executed_on
    count = row["count"]
    if DRIVER_HOSTNAME in exec_info:
        driver_count += count
    else:
        worker_count += count
        parts = dict(p.split("=") for p in exec_info.split("|"))
        worker_hosts.add(parts.get('host', 'unknown'))

print(f"Rows processed on DRIVER: {driver_count}")
print(f"Rows processed on WORKERS: {worker_count}")
print("")

if worker_count > 0:
    print("✅ CONFIRMED: UDFs are executing on WORKER NODES!")
    print(f"   Worker hosts: {worker_hosts}")
    pct = (worker_count / total_tasks) * 100
    print(f"   {pct:.1f}% of work done on workers")
elif driver_count > 0 and worker_count == 0:
    print("⚠️  All execution happened on the DRIVER")
    print("   Possible reasons:")
    print("   - Single-node cluster (no workers)")
    print("   - Data too small / not enough partitions")
    print("   - Local mode execution")
else:
    print("❓ Unable to determine execution location")

🎯 WORKER EXECUTION VERIFICATION

Driver hostname: 1218-095811-m484hsz3-10-0-9-182

Rows processed on DRIVER: 0
Rows processed on WORKERS: 10000

✅ CONFIRMED: UDFs are executing on WORKER NODES!
   Worker hosts: {'1218-095811-m484hsz3-10-0-11-143', '1218-095811-m484hsz3-10-0-0-109'}
   100.0% of work done on workers
