# Optimized PySpark ML pipeline for classification (Amazon Reviews)

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, expr, udf
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark import StorageLevel
import time

# ============================================================
# CONFIGURATION
# ============================================================
DATA_PATH = "../datasets/Amazon Reviews/*.tsv" 
MAX_LR_ITERATIONS = 100

# ============================================================
# STOP ANY EXISTING SPARK SESSION
# ============================================================
try:
    spark_temp = SparkSession.getActiveSession()
    if spark_temp:
        print("Stopping existing Spark session...")
        spark_temp.stop()
        import time as time_sleep
        time_sleep.sleep(2)  # Give it time to fully stop
except:
    pass

# ============================================================
# START TOTAL BENCHMARK TIMING
# ============================================================
TOTAL_BENCHMARK_START = time.time()

print("=" * 70)
print("PYSPARK OPTIMIZED BENCHMARK - AMAZON REVIEWS (Manual Features)")
print("=" * 70)
print("Optimizations: Kryo Serializer, Explicit Schema, Caching")

# ------------------------------------------------------------
# INITIALIZE SPARK SESSION (OPTIMIZED)
# ------------------------------------------------------------
print("\nInitializing Spark Session (with optimizations)...")
init_start = time.time()

spark = SparkSession.builder \
    .appName("AmazonReviewsOptimizedManual") \
    .master("local[*]") \
    .config("spark.driver.memory", "8g") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryoserializer.buffer.max.mb", "512") \
    .config("spark.sql.shuffle.partitions", "24") \
    .config("spark.default.parallelism", "24") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

init_time = time.time() - init_start
print(f"   Initialization time: {init_time:.2f}s")

# ------------------------------------------------------------
# DEFINE SCHEMA AND FEATURE EXTRACTION
# ------------------------------------------------------------
schema = StructType([
    StructField("marketplace", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("review_id", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("product_parent", StringType(), True),
    StructField("product_title", StringType(), True),
    StructField("product_category", StringType(), True),
    StructField("star_rating", StringType(), True),
    StructField("helpful_votes", StringType(), True),
    StructField("total_votes", StringType(), True),
    StructField("vine", StringType(), True),
    StructField("verified_purchase", StringType(), True),
    StructField("review_headline", StringType(), True),
    StructField("review_body", StringType(), True),
    StructField("review_date", StringType(), True)
])

def extract_features_python(text):
    if text is None:
        return Vectors.dense([0.0] * 10)
        
    text = str(text).lower()
    
    positive_words = ['great', 'good', 'excellent', 'love', 'perfect', 'best', 'amazing', 'fantastic', 'wonderful', 'awesome']
    negative_words = ['bad', 'terrible', 'worst', 'hate', 'awful', 'horrible', 'poor', 'disappointing', 'useless', 'waste']
    
    features = [
        float(len(text)),
        float(len(text.split())),
        float(text.count('!')),
        float(text.count('?')),
        float(sum(text.count(w) for w in positive_words)),
        float(sum(text.count(w) for w in negative_words)),
        float(text.count('not')),
        float(1.0 if 'recommend' in text else 0.0),
        float(1.0 if 'return' in text else 0.0),
        float(1.0 if 'money back' in text else 0.0),
    ]
    return Vectors.dense(features)

extract_features_udf = udf(extract_features_python, VectorUDT())

# ------------------------------------------------------------
# LOAD DATA WITH EXPLICIT SCHEMA
# ------------------------------------------------------------
print(f"\nLoading data with explicit schema from {DATA_PATH}...")
load_start = time.time()

df = spark.read.csv(DATA_PATH, header=True, schema=schema, sep="\t")

# Clean & Select
data = df.select(
    expr("try_cast(star_rating as int) as star_rating"), 
    col("review_body")
).dropna()

# Binary Label
data = data.withColumn("label", when(col("star_rating") > 3, 1.0).otherwise(0.0))

total_records = data.count()
load_time = time.time() - load_start

print(f"   Loaded {total_records} records in {load_time:.2f}s")

# ------------------------------------------------------------
# EXTRACT FEATURES AND CACHE
# ------------------------------------------------------------
print("\nExtracting features & caching...")
prep_start = time.time()

featurized_data = data.withColumn("features", extract_features_udf(col("review_body"))) \
                      .select("features", "label")

# OPTIMIZATION: Persist with Memory+Disk
featurized_data.persist(StorageLevel.MEMORY_AND_DISK)

# Split Data
train_data, test_data = featurized_data.randomSplit([0.7, 0.3], seed=42)

# Cache test data as well
test_data.cache()

# Force materialization
train_count = train_data.count()
test_count = test_data.count()

prep_time = time.time() - prep_start

print(f"   Feature extraction & caching time: {prep_time:.2f}s")
print(f"   Training size: {train_count}, Test size: {test_count}")

# ------------------------------------------------------------
# TRAIN AND EVALUATE MODELS
# ------------------------------------------------------------
print("\nTraining and evaluating models...")

evaluator_acc = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="accuracy"
)
evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="f1"
)

def train_and_evaluate(model, name, total_count):
    print(f"\n   Training: {name}...")
    start = time.time()
    
    # Fit (reads from cache)
    model_fit = model.fit(train_data)
    predictions = model_fit.transform(test_data)
    
    # Force execution
    predictions.cache()
    num_predictions = predictions.count()
    
    accuracy = evaluator_acc.evaluate(predictions)
    f1 = evaluator_f1.evaluate(predictions)
    
    end = time.time()
    time_taken = end - start
    throughput = total_count / time_taken if time_taken > 0 else 0
    
    print(f"      [{name}]")
    print(f"      Accuracy:   {accuracy:.4f}")
    print(f"      F1 Score:   {f1:.4f}")
    print(f"      Time:       {time_taken:.2f}s")
    print(f"      Throughput: {throughput:,.0f} records/s")
    
    predictions.unpersist()
    return accuracy, f1, time_taken, throughput

results = []
training_start = time.time()

# Logistic Regression
lr_model = LogisticRegression(maxIter=MAX_LR_ITERATIONS, featuresCol="features", labelCol="label")
lr_acc, lr_f1, lr_time, lr_tput = train_and_evaluate(lr_model, "Logistic Regression", train_count)
results.append({"Model": "Logistic Regression", "Accuracy": lr_acc, "F1 Score": lr_f1, "Time": lr_time, "Throughput": lr_tput})

# Decision Tree
dt_model = DecisionTreeClassifier(featuresCol="features", labelCol="label", maxDepth=5)
dt_acc, dt_f1, dt_time, dt_tput = train_and_evaluate(dt_model, "Decision Tree", train_count)
results.append({"Model": "Decision Tree", "Accuracy": dt_acc, "F1 Score": dt_f1, "Time": dt_time, "Throughput": dt_tput})

training_time = time.time() - training_start

# ------------------------------------------------------------
# CLEANUP
# ------------------------------------------------------------
print("\nCleaning up...")
cleanup_start = time.time()

featurized_data.unpersist()
test_data.unpersist()

cleanup_time = time.time() - cleanup_start
print(f"   Cleanup time: {cleanup_time:.2f}s")

# ============================================================
# END TOTAL BENCHMARK TIMING
# ============================================================
TOTAL_BENCHMARK_END = time.time()
TOTAL_TIME = TOTAL_BENCHMARK_END - TOTAL_BENCHMARK_START

# ============================================================
# RESULTS SUMMARY
# ============================================================
print("\n" + "=" * 70)
print("TIMING BREAKDOWN")
print("=" * 70)
print(f"Spark Initialization:   {init_time:>8.2f}s")
print(f"Data Loading:           {load_time:>8.2f}s")
print(f"Feature Extract+Cache:  {prep_time:>8.2f}s")
print(f"Training (both models): {training_time:>8.2f}s")
print(f"Cleanup:                {cleanup_time:>8.2f}s")
print("-" * 70)
print(f"TOTAL END-TO-END TIME:  {TOTAL_TIME:>8.2f}s")
print("=" * 70)

print("\n" + "=" * 70)
print("--- Summary of OPTIMIZED Benchmark (Manual Features) ---")
print("=" * 70)
print(f"{'Model':<20} | {'Acc':<6} | {'F1':<6} | {'Time (s)':<9} | {'Throughput (rec/s)':<20}")
print("-" * 70)
for res in results:
    print(f"{res['Model']:<20} | {res['Accuracy']:.4f} | {res['F1 Score']:.4f} | {res['Time']:.2f}s | {res['Throughput']:,.0f}")

print("\n" + "=" * 70)
print("COMPARISON METRICS")
print("=" * 70)
print(f"Total Job Time:         {TOTAL_TIME:.2f}s")
print(f"Training Records:       {train_count}")
print(f"Test Records:           {test_count}")
print("=" * 70)

spark.stop()

PYSPARK OPTIMIZED BENCHMARK - AMAZON REVIEWS (Manual Features)
Optimizations: Kryo Serializer, Explicit Schema, Caching

Initializing Spark Session (with optimizations)...
   Initialization time: 0.52s

Loading data with explicit schema from ../datasets/Amazon Reviews/*.tsv...


                                                                                

   Loaded 12106558 records in 7.37s

Extracting features & caching...


                                                                                

   Feature extraction & caching time: 42.04s
   Training size: 8473236, Test size: 3633322

Training and evaluating models...

   Training: Logistic Regression...


                                                                                

      [Logistic Regression]
      Accuracy:   0.7725
      F1 Score:   0.7359
      Time:       15.20s
      Throughput: 557,625 records/s

   Training: Decision Tree...


                                                                                

      [Decision Tree]
      Accuracy:   0.7839
      F1 Score:   0.7561
      Time:       20.35s
      Throughput: 416,466 records/s

Cleaning up...
   Cleanup time: 0.00s

TIMING BREAKDOWN
Spark Initialization:       0.52s
Data Loading:               7.37s
Feature Extract+Cache:     42.04s
Training (both models):    35.55s
Cleanup:                    0.00s
----------------------------------------------------------------------
TOTAL END-TO-END TIME:     85.51s

--- Summary of OPTIMIZED Benchmark (Manual Features) ---
Model                | Acc    | F1     | Time (s)  | Throughput (rec/s)  
----------------------------------------------------------------------
Logistic Regression  | 0.7725 | 0.7359 | 15.20s | 557,625
Decision Tree        | 0.7839 | 0.7561 | 20.35s | 416,466

COMPARISON METRICS
Total Job Time:         85.51s
Training Records:       8473236
Test Records:           3633322
