In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
os.environ["SPARK_HOME"] = "/home/koushik/spark"

from pyspark.sql import SparkSession

try:
    spark.stop()
except:
    pass

spark = SparkSession.builder \
    .appName("4_Distributed_Processing") \
    .master("local[4]") \
    .config("spark.driver.memory", "6g") \
    .config("spark.sql.shuffle.partitions", "50") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
print("Spark:", spark.version)

26/02/26 11:46:04 WARN Utils: Your hostname, KoushikPC resolves to a loopback address: 127.0.1.2; using 10.255.255.254 instead (on interface lo)
26/02/26 11:46:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/26 11:46:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
26/02/26 11:46:06 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


Spark: 3.5.1


In [3]:
from pyspark.sql import functions as F
from pyspark.sql.functions import broadcast
from pyspark import StorageLevel
import time

print("DISTRIBUTED PROCESSING ")

df = spark.read.parquet("/home/koushik/pp_features")

print("\n[1] Persisting data...")
df.persist(StorageLevel.MEMORY_AND_DISK)
t = time.time()
total = df.count()
print(f"    Rows: {total:,} | Persist time: {time.time()-t:.2f}s")

DISTRIBUTED PROCESSING 


                                                                                


[1] Persisting data...


[Stage 2:>                                                          (0 + 4) / 4]

    Rows: 30,856,185 | Persist time: 54.12s


                                                                                

In [4]:

print("\n Broadcast Join...")

df_raw = spark.read.parquet("/home/koushik/pp_parquet")


prop_lookup = spark.createDataFrame([
    ("D", "Detached"),
    ("S", "Semi-Detached"),
    ("T", "Terraced"),
    ("F", "Flat/Maisonette"),
    ("O", "Other")
], ["prop_type", "prop_description"])

t = time.time()
df_joined = df_raw.join(
    broadcast(prop_lookup),
    on="prop_type",
    how="left"
)
count = df_joined.count()
join_time = time.time() - t

print(f"    Joined rows  : {count:,}")
print(f"    Join time    : {join_time:.2f}s")
print("    Sample:")
df_joined.select("prop_type", "prop_description", "price").show(5)


 Broadcast Join...


                                                                                

    Joined rows  : 30,856,185
    Join time    : 2.52s
    Sample:
+---------+----------------+------+
|prop_type|prop_description| price|
+---------+----------------+------+
|        D|        Detached| 75000|
|        T|        Terraced| 49995|
|        T|        Terraced| 79995|
|        S|   Semi-Detached|151000|
|        S|   Semi-Detached|146500|
+---------+----------------+------+
only showing top 5 rows



In [5]:

print("\n Error Handling with Data Lineage...")

def safe_load_parquet(path):
    try:
        df = spark.read.parquet(path)
        count = df.count()
        print(f"     Loaded: {path} → {count:,} rows")
        return df
    except Exception as e:
        print(f"     Failed to load {path}: {str(e)}")
        return None

def safe_model_train(model, train_df, name="Model"):
    try:
        t = time.time()
        fitted = model.fit(train_df)
        print(f"     {name} trained in {time.time()-t:.1f}s")
        return fitted
    except MemoryError:
        print(f"     {name} failed: Out of Memory — try smaller sample")
        return None
    except Exception as e:
        print(f"     {name} failed: {str(e)}")
        return None

df_features = safe_load_parquet("/home/koushik/pp_features")
df_missing   = safe_load_parquet("/home/koushik/nonexistent_path")  # will fail safely

print("\n[4] Unpersisting to free memory...")
df.unpersist()
print("     df unpersisted")

print("\n[5] Persist only model-ready data...")
df_sample = df_features.sample(fraction=0.2, seed=42)
train_df, test_df = df_sample.randomSplit([0.8, 0.2], seed=42)

train_df.persist(StorageLevel.MEMORY_AND_DISK)
test_df.persist(StorageLevel.MEMORY_AND_DISK)

train_count = train_df.count()
test_count  = test_df.count()
print(f"     train_df persisted: {train_count:,} rows")
print(f"     test_df  persisted: {test_count:,} rows")


 Error Handling with Data Lineage...


                                                                                

     Loaded: /home/koushik/pp_features → 30,856,185 rows
     Failed to load /home/koushik/nonexistent_path: [PATH_NOT_FOUND] Path does not exist: file:/home/koushik/nonexistent_path.

[4] Unpersisting to free memory...
     df unpersisted

[5] Persist only model-ready data...




     train_df persisted: 4,939,684 rows
     test_df  persisted: 1,235,407 rows


                                                                                

In [6]:
from pyspark.ml.regression import RandomForestRegressor, DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    labelCol="log_price", predictionCol="prediction", metricName="rmse"
)

rf  = safe_model_train(
    RandomForestRegressor(featuresCol="features", labelCol="log_price",
                          numTrees=20, maxDepth=6, maxBins=64, seed=42),
    train_df, "Random Forest"
)

dt  = safe_model_train(
    DecisionTreeRegressor(featuresCol="features", labelCol="log_price",
                          maxDepth=6, maxBins=64, seed=42),
    train_df, "Decision Tree"
)

for name, model in [("Random Forest", rf), ("Decision Tree", dt)]:
    if model is not None:
        pred = model.transform(test_df)
        rmse = evaluator.setMetricName("rmse").evaluate(pred)
        r2   = evaluator.setMetricName("r2").evaluate(pred)
        print(f"\n{name} → RMSE: {rmse:.4f} | R2: {r2:.4f}")

train_df.unpersist()
test_df.unpersist()
print("\n All caches cleared — memory freed")
print(" Distributed Processing Complete")

                                                                                

     Random Forest trained in 121.0s


                                                                                

     Decision Tree trained in 41.5s


                                                                                


Random Forest → RMSE: 0.5809 | R2: 0.5393

Decision Tree → RMSE: 0.5789 | R2: 0.5424

 All caches cleared — memory freed
 Distributed Processing Complete


In [7]:
t = time.time()
df_result = df.groupBy().avg("log_price").collect()
df_time = time.time() - t
print(f"DataFrame avg log_price : {df_result[0][0]:.4f}")
print(f"DataFrame Time          : {df_time:.2f}s")

t = time.time()
rdd = df.select("log_price").rdd.map(lambda x: x[0])
rdd_count  = rdd.count()
rdd_sum    = rdd.sum()
rdd_avg    = rdd_sum / rdd_count
rdd_time   = time.time() - t
print(f"\nRDD avg log_price       : {rdd_avg:.4f}")
print(f"RDD Time                : {rdd_time:.2f}s")

print(f"\nDataFrame is {rdd_time/df_time:.2f}x FASTER than RDD")

DataFrame avg log_price : 11.9393
DataFrame Time          : 0.25s





RDD avg log_price       : 11.9393
RDD Time                : 33.93s

DataFrame is 136.49x FASTER than RDD


                                                                                

In [None]:
import time
import matplotlib.pyplot as plt
import numpy as np
from pyspark.ml.regression import DecisionTreeRegressor

print("===== STRONG SCALING =====")
print("Same data size, increasing cores\n")

DATA_PATH = "/home/koushik/pp_features"
FRACTION  = 0.1   # fixed data size

cores_list    = [1, 2, 4]
strong_times  = []
strong_rmse   = []

for cores in cores_list:
    # Restart spark with different cores
    spark.stop()
    spark = SparkSession.builder \
        .appName(f"Strong_Scaling_{cores}cores") \
        .master(f"local[{cores}]") \
        .config("spark.driver.memory", "6g") \
        .config("spark.sql.shuffle.partitions", "50") \
        .getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")

    df = spark.read.parquet(DATA_PATH).sample(fraction=FRACTION, seed=42)
    tr, te = df.randomSplit([0.8, 0.2], seed=42)
    tr.cache(); tr.count()

    t = time.time()
    model = DecisionTreeRegressor(
        featuresCol="features", labelCol="log_price",
        maxDepth=5, maxBins=32
    ).fit(tr)
    elapsed = time.time() - t

    strong_times.append(elapsed)
    rows = tr.count()
    print(f"Cores: {cores} | Rows: {rows:,} | Time: {elapsed:.1f}s")
    tr.unpersist()

speedup = [strong_times[0]/t for t in strong_times]
efficiency = [s/c * 100 for s, c in zip(speedup, cores_list)]

print("\n--- Strong Scaling Summary ---")
for c, t, s, e in zip(cores_list, strong_times, speedup, efficiency):
    print(f"Cores: {c} | Time: {t:.1f}s | Speedup: {s:.2f}x | Efficiency: {e:.1f}%")

In [8]:
import time
import matplotlib.pyplot as plt
import numpy as np
from pyspark.ml.regression import DecisionTreeRegressor

print("STRONG SCALING")
print("Same data size, increasing cores\n")

DATA_PATH = "/home/koushik/pp_features"
FRACTION  = 0.1 
cores_list    = [1, 2, 4]
strong_times  = []
strong_rmse   = []

for cores in cores_list:
    # Restart spark with different cores
    spark.stop()
    spark = SparkSession.builder \
        .appName(f"Strong_Scaling_{cores}cores") \
        .master(f"local[{cores}]") \
        .config("spark.driver.memory", "6g") \
        .config("spark.sql.shuffle.partitions", "50") \
        .getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")

    df = spark.read.parquet(DATA_PATH).sample(fraction=FRACTION, seed=42)
    tr, te = df.randomSplit([0.8, 0.2], seed=42)
    tr.cache(); tr.count()

    t = time.time()
    model = DecisionTreeRegressor(
        featuresCol="features", labelCol="log_price",
        maxDepth=5, maxBins=32
    ).fit(tr)
    elapsed = time.time() - t

    strong_times.append(elapsed)
    rows = tr.count()
    print(f"Cores: {cores} | Rows: {rows:,} | Time: {elapsed:.1f}s")
    tr.unpersist()

speedup = [strong_times[0]/t for t in strong_times]
efficiency = [s/c * 100 for s, c in zip(speedup, cores_list)]

print("\n--- Strong Scaling Summary ---")
for c, t, s, e in zip(cores_list, strong_times, speedup, efficiency):
    print(f"Cores: {c} | Time: {t:.1f}s | Speedup: {s:.2f}x | Efficiency: {e:.1f}%")

STRONG SCALING
Same data size, increasing cores



                                                                                

Cores: 1 | Rows: 2,469,749 | Time: 52.1s


                                                                                

Cores: 2 | Rows: 2,469,749 | Time: 28.2s




Cores: 4 | Rows: 2,469,260 | Time: 16.1s

--- Strong Scaling Summary ---
Cores: 1 | Time: 52.1s | Speedup: 1.00x | Efficiency: 100.0%
Cores: 2 | Time: 28.2s | Speedup: 1.85x | Efficiency: 92.3%
Cores: 4 | Time: 16.1s | Speedup: 3.25x | Efficiency: 81.2%


                                                                                

In [9]:
print("===== BOTTLENECK ANALYSIS =====\n")

df = spark.read.parquet(DATA_PATH).sample(fraction=0.1, seed=42)

# 1. I/O Bottleneck
t = time.time()
spark.read.parquet(DATA_PATH).sample(fraction=0.1, seed=42).count()
io_time = time.time() - t
print(f"[1] I/O Time (read+count)      : {io_time:.2f}s")

# 2. Shuffle Bottleneck
t = time.time()
from pyspark.sql import functions as F
df.groupBy("log_price").count().collect()
shuffle_time = time.time() - t
print(f"[2] Shuffle Time (groupBy)     : {shuffle_time:.2f}s")

# 3. Memory Bottleneck (no cache)
tr, te = df.randomSplit([0.8, 0.2], seed=42)
t = time.time()
DecisionTreeRegressor(featuresCol="features", labelCol="log_price",
                       maxDepth=4, maxBins=32).fit(tr)
no_cache_time = time.time() - t
print(f"[3] Training (no cache)        : {no_cache_time:.2f}s")

# 4. With Cache
from pyspark import StorageLevel
tr.persist(StorageLevel.MEMORY_AND_DISK)
tr.count()
t = time.time()
DecisionTreeRegressor(featuresCol="features", labelCol="log_price",
                       maxDepth=4, maxBins=32).fit(tr)
cache_time = time.time() - t
print(f"[4] Training (with cache)      : {cache_time:.2f}s")
print(f"    Cache Speedup              : {no_cache_time/cache_time:.2f}x")
tr.unpersist()

# Identify main bottleneck
times_dict = {
    "I/O":     io_time,
    "Shuffle": shuffle_time,
    "Compute": no_cache_time
}
bottleneck = max(times_dict, key=times_dict.get)
print(f"\n Main Bottleneck: {bottleneck} ({times_dict[bottleneck]:.2f}s)")
print(f"Recommendation: ", end="")
if bottleneck == "I/O":
    print("Use Parquet format + partitioning strategy")
elif bottleneck == "Shuffle":
    print("Reduce shuffle partitions + use broadcast joins")
else:
    print("Use persist() + increase driver memory")

===== BOTTLENECK ANALYSIS =====

[1] I/O Time (read+count)      : 0.21s
[2] Shuffle Time (groupBy)     : 0.84s


                                                                                

[3] Training (no cache)        : 27.19s


[Stage 56:>                                                         (0 + 4) / 4]

[4] Training (with cache)      : 17.12s
    Cache Speedup              : 1.59x

 Main Bottleneck: Compute (27.19s)
Recommendation: Use persist() + increase driver memory


                                                                                