# Lab 9 - Amazon Reviews Classification with Spark ML

This notebook implements a machine learning pipeline to classify Amazon food reviews as "useful" or "useless" based on helpfulness ratings.

## Objectives:

1. **Step 1**: Preprocessing - Load and clean the dataset
2. **Step 2**: Create Pipeline with simple features (text length)
3. **Step 3**: Add more features to improve classification
4. **Step 4**: Use text content for classification

## Classification Task:
- **Target**: Binary classification (useful vs useless)
- **Useful**: HelpfulnessNumerator/HelpfulnessDenominator > 0.9 (90%)
- **Useless**: HelpfulnessNumerator/HelpfulnessDenominator ≤ 0.9
- **Filter**: Only reviews with HelpfulnessDenominator > 0

## Algorithms to Compare:
- **Decision Tree**
- **Logistic Regression**

## Import libraries and setup

In [None]:
# Import modules
from pyspark.ml import Pipeline
from pyspark.sql.types import IntegerType, FloatType
from pyspark.ml.feature import VectorAssembler, StringIndexer, IndexToString, SQLTransformer
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, RegexTokenizer, StopWordsRemover
from pyspark.ml.classification import *
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.functions import udf, length, col, when, split, size, regexp_count
from pyspark.sql.types import LongType, BooleanType, DoubleType

## Configuration and data loading

In [None]:
# Define input path and constants
inputData = "ReviewsSample-2.csv"  # For local testing
# inputData = "/data/students/bigdata-01QYD/Lab9/Reviews.csv"  # For HDFS

In [None]:
# Load the data
# Create a DataFrame from Reviews.csv
reviews = spark.read.load(inputData,
                     format="csv",
                     header=True,
                     inferSchema=True)

print(f"Total reviews loaded: {reviews.count()}")
print("\nSchema:")
reviews.printSchema()

print("\nFirst 3 reviews:")
reviews.show(3, truncate=False)

## Step 1: Preprocessing

### Filter rated reviews and create labels

In [None]:
# Select only the records with HelpfulnessDenominator>0 (i.e., rated reviews)
rated_reviews = reviews.filter(col("HelpfulnessDenominator") > 0)

In [None]:
# Create and compute the value of Column label for the selected rated reviews
# Label = 1.0 if helpfulness ratio > 0.9 (useful), else 0.0 (useless)
labeled_reviews = rated_reviews.withColumn(
    "helpfulness_ratio", 
    col("HelpfulnessNumerator") / col("HelpfulnessDenominator")
).withColumn(
    "label",
    when(col("helpfulness_ratio") > 0.9, 1.0).otherwise(0.0)
)

### Split data into train and test sets

In [None]:
# Split the dataframe with Column label in training and test set
(reviews_train, reviews_test) = labeled_reviews.randomSplit([0.8, 0.2], seed=42)

# Cache for performance
reviews_train.cache()
reviews_test.cache()

## Step 2: Simple Pipeline with Text Length Feature

### Decision Tree Classifier

In [None]:
# Create/Define the preprocessing steps and the classification algorithm
# Implement a first solution with one single value in features: text length

# SQL Transformer to add text length feature
text_length_transformer = SQLTransformer(
    statement="SELECT *, length(Text) as text_length FROM __THIS__"
)

# Vector assembler to create features vector
vector_assembler_simple = VectorAssembler(
    inputCols=["text_length"],
    outputCol="features"
)

# Decision Tree classifier
dt_classifier = DecisionTreeClassifier(
    featuresCol="features",
    labelCol="label",
    predictionCol="prediction"
)

# Create pipeline
pipeline_dt_simple = Pipeline(stages=[
    text_length_transformer,
    vector_assembler_simple,
    dt_classifier
])

In [None]:
# Fit/Train the model
model_dt_simple = pipeline_dt_simple.fit(reviews_train)

# Apply the model on the test set
predictions_dt_simple = model_dt_simple.transform(reviews_test).cache()

In [None]:
# Compute statistics for Decision Tree with simple features
# Accuracy, F1, weighted recall, weighted precision
evaluatorAcc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluatorF1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
evaluatorRecall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
evaluatorPrecision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")

dt_simple_accuracy = evaluatorAcc.evaluate(predictions_dt_simple)
dt_simple_f1 = evaluatorF1.evaluate(predictions_dt_simple)
dt_simple_recall = evaluatorRecall.evaluate(predictions_dt_simple)
dt_simple_precision = evaluatorPrecision.evaluate(predictions_dt_simple)

print(f"Accuracy: {dt_simple_accuracy:.4f}")
print(f"F1: {dt_simple_f1:.4f}")
print(f"Weighted Recall: {dt_simple_recall:.4f}")
print(f"Weighted Precision: {dt_simple_precision:.4f}")

In [None]:
# Compute the confusion matrix for Decision Tree
#                     Predicted  
#  Actual       Useful   Useless
#  Useful          A        B
#  Useless         C        D

A_dt = predictions_dt_simple.filter("prediction=1 and label=1").count()
B_dt = predictions_dt_simple.filter("prediction=0 and label=1").count()
C_dt = predictions_dt_simple.filter("prediction=1 and label=0").count()
D_dt = predictions_dt_simple.filter("prediction=0 and label=0").count()

print("\nConfusion Matrix (Decision Tree):")
print("                       Predicted")
print("  Actual \t Useful\tUseless")
print(f"  Useful \t {A_dt}\t\t{B_dt}")
print(f"  Useless \t {C_dt}\t\t{D_dt}")

# Precision and recall for the two classes
# Useful
if A_dt + C_dt == 0:
    print("Precision(Useful): undefined")
else:
    print(f"Precision(Useful): {A_dt/(A_dt+C_dt):.4f}")

if A_dt + B_dt == 0:
    print("Recall(Useful): undefined")
else:
    print(f"Recall(Useful): {A_dt/(A_dt+B_dt):.4f}")

# Useless 
if B_dt + D_dt == 0:
    print("Precision(Useless): undefined")
else:
    print(f"Precision(Useless): {D_dt/(B_dt+D_dt):.4f}")

if C_dt + D_dt == 0:
    print("Recall(Useless): undefined")
else:
    print(f"Recall(Useless): {D_dt/(C_dt+D_dt):.4f}")

### Logistic Regression Classifier

In [None]:
# Logistic Regression classifier
lr_classifier = LogisticRegression(
    featuresCol="features",
    labelCol="label",
    predictionCol="prediction"
)

# Create pipeline with Logistic Regression
pipeline_lr_simple = Pipeline(stages=[
    text_length_transformer,
    vector_assembler_simple,
    lr_classifier
])

# Fit/Train the model
model_lr_simple = pipeline_lr_simple.fit(reviews_train)

# Apply the model on the test set
predictions_lr_simple = model_lr_simple.transform(reviews_test).cache()

In [None]:
# Compute statistics for Logistic Regression with simple features
lr_simple_accuracy = evaluatorAcc.evaluate(predictions_lr_simple)
lr_simple_f1 = evaluatorF1.evaluate(predictions_lr_simple)
lr_simple_recall = evaluatorRecall.evaluate(predictions_lr_simple)
lr_simple_precision = evaluatorPrecision.evaluate(predictions_lr_simple)

print(f"Accuracy: {lr_simple_accuracy:.4f}")
print(f"F1: {lr_simple_f1:.4f}")
print(f"Weighted Recall: {lr_simple_recall:.4f}")
print(f"Weighted Precision: {lr_simple_precision:.4f}")

# Confusion matrix for Logistic Regression
A_lr = predictions_lr_simple.filter("prediction=1 and label=1").count()
B_lr = predictions_lr_simple.filter("prediction=0 and label=1").count()
C_lr = predictions_lr_simple.filter("prediction=1 and label=0").count()
D_lr = predictions_lr_simple.filter("prediction=0 and label=0").count()

print("\nConfusion Matrix (Logistic Regression):")
print("                       Predicted")
print("  Actual \t Useful\tUseless")
print(f"  Useful \t {A_lr}\t\t{B_lr}")
print(f"  Useless \t {C_lr}\t\t{D_lr}")

## Step 3: Enhanced Features Pipeline

In [None]:
# Create multiple features based on the review characteristics
enhanced_features_transformer = SQLTransformer(
    statement="""
    SELECT *,
           length(Text) as text_length,
           length(Summary) as summary_length,
           Score as review_score,
           HelpfulnessNumerator as helpful_votes,
           HelpfulnessDenominator as total_votes,
           CASE WHEN length(Text) > 100 THEN 1.0 ELSE 0.0 END as is_long_review,
           CASE WHEN Score >= 4 THEN 1.0 ELSE 0.0 END as is_positive_score
    FROM __THIS__
    """
)

# Vector assembler with multiple features
vector_assembler_enhanced = VectorAssembler(
    inputCols=[
        "text_length",
        "summary_length", 
        "review_score",
        "helpful_votes",
        "total_votes",
        "is_long_review",
        "is_positive_score"
    ],
    outputCol="features"
)

### Decision Tree with Enhanced Features

In [None]:
# Decision Tree with enhanced features
pipeline_dt_enhanced = Pipeline(stages=[
    enhanced_features_transformer,
    vector_assembler_enhanced,
    dt_classifier
])

# Train and evaluate
model_dt_enhanced = pipeline_dt_enhanced.fit(reviews_train)
predictions_dt_enhanced = model_dt_enhanced.transform(reviews_test).cache()

print("\n=== Decision Tree with Enhanced Features - Results ===")
dt_enhanced_accuracy = evaluatorAcc.evaluate(predictions_dt_enhanced)
dt_enhanced_f1 = evaluatorF1.evaluate(predictions_dt_enhanced)
dt_enhanced_recall = evaluatorRecall.evaluate(predictions_dt_enhanced)
dt_enhanced_precision = evaluatorPrecision.evaluate(predictions_dt_enhanced)

print(f"Accuracy: {dt_enhanced_accuracy:.4f}")
print(f"F1: {dt_enhanced_f1:.4f}")
print(f"Weighted Recall: {dt_enhanced_recall:.4f}")
print(f"Weighted Precision: {dt_enhanced_precision:.4f}")

# Show feature importance if available
try:
    feature_names = vector_assembler_enhanced.getInputCols()
    dt_model = model_dt_enhanced.stages[2]  # Get the trained decision tree
    importances = dt_model.featureImportances.toArray()
    
    print("\nFeature Importances:")
    for name, importance in zip(feature_names, importances):
        print(f"  {name}: {importance:.4f}")
except Exception as e:
    print(f"Could not extract feature importances: {e}")

### Logistic Regression with Enhanced Features

In [None]:
# Logistic Regression with enhanced features
pipeline_lr_enhanced = Pipeline(stages=[
    enhanced_features_transformer,
    vector_assembler_enhanced,
    lr_classifier
])

# Train and evaluate
model_lr_enhanced = pipeline_lr_enhanced.fit(reviews_train)
predictions_lr_enhanced = model_lr_enhanced.transform(reviews_test).cache()

print("\n=== Logistic Regression with Enhanced Features - Results ===")
lr_enhanced_accuracy = evaluatorAcc.evaluate(predictions_lr_enhanced)
lr_enhanced_f1 = evaluatorF1.evaluate(predictions_lr_enhanced)
lr_enhanced_recall = evaluatorRecall.evaluate(predictions_lr_enhanced)
lr_enhanced_precision = evaluatorPrecision.evaluate(predictions_lr_enhanced)

print(f"Accuracy: {lr_enhanced_accuracy:.4f}")
print(f"F1: {lr_enhanced_f1:.4f}")
print(f"Weighted Recall: {lr_enhanced_recall:.4f}")
print(f"Weighted Precision: {lr_enhanced_precision:.4f}")

## Step 4: Text Content-Based Classification

In [None]:
# Text processing pipeline with TF-IDF
# 1. Tokenize the text
tokenizer = RegexTokenizer(
    inputCol="Text",
    outputCol="words",
    pattern="\\W"
)

# 2. Remove stop words
stop_words_remover = StopWordsRemover(
    inputCol="words",
    outputCol="filtered_words"
)

# 3. Create term frequency features
hashing_tf = HashingTF(
    inputCol="filtered_words",
    outputCol="raw_features",
    numFeatures=1000  # Hash to 1000 features
)

# 4. Apply IDF to get TF-IDF features
idf = IDF(
    inputCol="raw_features",
    outputCol="features"
)

### Decision Tree with Text Content

In [None]:
# Decision Tree with text content
pipeline_dt_text = Pipeline(stages=[
    tokenizer,
    stop_words_remover,
    hashing_tf,
    idf,
    dt_classifier
])

# Train and evaluate
print("Training Decision Tree with text content...")
model_dt_text = pipeline_dt_text.fit(reviews_train)
predictions_dt_text = model_dt_text.transform(reviews_test).cache()

print("\n=== Decision Tree with Text Content - Results ===")
dt_text_accuracy = evaluatorAcc.evaluate(predictions_dt_text)
dt_text_f1 = evaluatorF1.evaluate(predictions_dt_text)
dt_text_recall = evaluatorRecall.evaluate(predictions_dt_text)
dt_text_precision = evaluatorPrecision.evaluate(predictions_dt_text)

print(f"Accuracy: {dt_text_accuracy:.4f}")
print(f"F1: {dt_text_f1:.4f}")
print(f"Weighted Recall: {dt_text_recall:.4f}")
print(f"Weighted Precision: {dt_text_precision:.4f}")

### Logistic Regression with Text Content

In [None]:
# Logistic Regression with text content
pipeline_lr_text = Pipeline(stages=[
    tokenizer,
    stop_words_remover,
    hashing_tf,
    idf,
    lr_classifier
])

# Train and evaluate
print("Training Logistic Regression with text content...")
model_lr_text = pipeline_lr_text.fit(reviews_train)
predictions_lr_text = model_lr_text.transform(reviews_test).cache()

print("\n=== Logistic Regression with Text Content - Results ===")
lr_text_accuracy = evaluatorAcc.evaluate(predictions_lr_text)
lr_text_f1 = evaluatorF1.evaluate(predictions_lr_text)
lr_text_recall = evaluatorRecall.evaluate(predictions_lr_text)
lr_text_precision = evaluatorPrecision.evaluate(predictions_lr_text)

print(f"Accuracy: {lr_text_accuracy:.4f}")
print(f"F1: {lr_text_f1:.4f}")
print(f"Weighted Recall: {lr_text_recall:.4f}")
print(f"Weighted Precision: {lr_text_precision:.4f}")

## Results Comparison and Analysis

In [None]:
print("\n" + "="*80)
print("                    FINAL RESULTS COMPARISON")
print("="*80)

# Create results summary
results = [
    ("Decision Tree - Simple (Text Length)", dt_simple_accuracy, dt_simple_f1, dt_simple_precision, dt_simple_recall),
    ("Logistic Regression - Simple (Text Length)", lr_simple_accuracy, lr_simple_f1, lr_simple_precision, lr_simple_recall),
    ("Decision Tree - Enhanced Features", dt_enhanced_accuracy, dt_enhanced_f1, dt_enhanced_precision, dt_enhanced_recall),
    ("Logistic Regression - Enhanced Features", lr_enhanced_accuracy, lr_enhanced_f1, lr_enhanced_precision, lr_enhanced_recall),
    ("Decision Tree - Text Content (TF-IDF)", dt_text_accuracy, dt_text_f1, dt_text_precision, dt_text_recall),
    ("Logistic Regression - Text Content (TF-IDF)", lr_text_accuracy, lr_text_f1, lr_text_precision, lr_text_recall)
]

print(f"{'Model':<40} {'Accuracy':<10} {'F1':<10} {'Precision':<10} {'Recall':<10}")
print("-" * 80)

for model_name, accuracy, f1, precision, recall in results:
    print(f"{model_name:<40} {accuracy:<10.4f} {f1:<10.4f} {precision:<10.4f} {recall:<10.4f}")

# Find best performing models
best_accuracy = max(results, key=lambda x: x[1])
best_f1 = max(results, key=lambda x: x[2])

print(f"\nBest Accuracy: {best_accuracy[0]} ({best_accuracy[1]:.4f})")
print(f"Best F1 Score: {best_f1[0]} ({best_f1[2]:.4f})")

## Analysis and Insights

In [None]:
print("PERFORMANCE OBSERVATIONS:")
print(f"   - Best overall accuracy: {best_accuracy[1]:.4f} ({best_accuracy[0]})")
print(f"   - Best F1 score: {best_f1[2]:.4f} ({best_f1[0]})")

# Calculate improvement from simple to enhanced features
dt_improvement = ((dt_enhanced_accuracy - dt_simple_accuracy) / dt_simple_accuracy) * 100
lr_improvement = ((lr_enhanced_accuracy - lr_simple_accuracy) / lr_simple_accuracy) * 100

print(f"FEATURE ENGINEERING IMPACT:")
print(f"   - Decision Tree improvement: {dt_improvement:+.1f}% (simple -> enhanced)")
print(f"   - Logistic Regression improvement: {lr_improvement:+.1f}% (simple -> enhanced)")

## Sample Predictions Analysis

In [None]:
# Get some sample predictions for analysis
sample_predictions = predictions_lr_text.select(
    "Text", "label", "prediction", "helpfulness_ratio", "HelpfulnessNumerator", "HelpfulnessDenominator"
).limit(10)

for row in sample_predictions.collect():
    text_preview = row['Text'][:80] + "..." if len(row['Text']) > 80 else row['Text']
    print(f"Text: {text_preview}")
    print(f"Helpfulness: {row['HelpfulnessNumerator']}/{row['HelpfulnessDenominator']} = {row['helpfulness_ratio']:.3f}")
    print(f"Actual: {row['label']}, Predicted: {row['prediction']}")
    correct = "✓" if row['label'] == row['prediction'] else "✗"
    print(f"Result: {correct}")
    print("-" * 100)