In [13]:
# ------------------------------------------------------------
# 1. Spark Setup and Data Reading
# ------------------------------------------------------------

# Create a SparkSession for the application
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AmazonReviewBalancedClassifier").getOrCreate()

print("\n📥 Reading raw data...")
# Load the Amazon review dataset (JSON format)
raw_df = spark.read.json(r"C:\Users\AYOUB KHABALI\Documents\GitHub\Amazon_Models_training-\code\train_data.json")



📥 Reading raw data...


In [14]:
# ------------------------------------------------------------
# 2. Data Preprocessing
# ------------------------------------------------------------

from pyspark.sql.functions import col, when, concat_ws, lit, trim, length, avg, count

# Combine 'summary' and 'reviewText' into a single 'text' column
df = raw_df.withColumn("text", 
    concat_ws(" ", 
        when(col("summary").isNull(), "").otherwise(col("summary")),
        when(col("reviewText").isNull(), "").otherwise(col("reviewText"))
    ))

# Replace null or empty reviews with the placeholder "empty_review"
df = df.withColumn("text", 
    when(col("text").isNull() | (trim(col("text")) == ""), lit("empty_review")).otherwise(col("text")))

# Display the class distribution
print("\n🧾 Original class distribution:")
df.groupBy("label").agg(count("*").alias("count")).orderBy("label").show()

# Display average review length per class
print("\n📝 Average review length by class:")
df.withColumn("text_length", length(col("text"))).groupBy("label").agg(avg("text_length")).show()



🧾 Original class distribution:
+-----+-----+
|label|count|
+-----+-----+
|    0|  420|
|    1|  695|
|    2| 8119|
+-----+-----+


📝 Average review length by class:
+-----+------------------+
|label|  avg(text_length)|
+-----+------------------+
|    0| 609.7857142857143|
|    1| 579.6086330935252|
|    2|483.21246458923514|
+-----+------------------+



In [16]:
# ------------------------------------------------------------
# 3. Balancing the Dataset
# ------------------------------------------------------------

from pyspark.sql.functions import rand

# Define the target number of samples per class
target_size = 1500

# Filter data by class
class_0 = df.filter(col("label") == 0)
class_1 = df.filter(col("label") == 1)
class_2 = df.filter(col("label") == 2)

# Oversampling function for balancing minority classes
def oversample(df_class, target):
    ratio = int(target / df_class.count()) + 1
    oversampled = df_class
    for _ in range(ratio - 1):
        oversampled = oversampled.union(df_class)
    return oversampled.orderBy(rand()).limit(target)

print("\n🔁 Balancing dataset...")

# Apply oversampling or downsampling to balance classes
balanced_0 = oversample(class_0, target_size)
balanced_1 = oversample(class_1, target_size)
balanced_2 = class_2.orderBy(rand()).limit(target_size)

# Merge and shuffle the balanced dataset
balanced_df = balanced_0.union(balanced_1).union(balanced_2).orderBy(rand())

# Display the new balanced class distribution
print("\n✅ Balanced class distribution:")
balanced_df.groupBy("label").agg(count("*").alias("count")).orderBy("label").show()



🔁 Balancing dataset...

✅ Balanced class distribution:
+-----+-----+
|label|count|
+-----+-----+
|    0| 1500|
|    1| 1500|
|    2| 1500|
+-----+-----+



In [47]:
# ------------------------------------------------------------
# 4. Pipeline: Tokenization → Stopword Removal → Vectorization
# ------------------------------------------------------------

from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer
from pyspark.ml.classification import NaiveBayes
from pyspark.ml import Pipeline

# Tokenize the text column into words
tokenizer = Tokenizer(inputCol="text", outputCol="words")

# Remove stopwords
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered")

# Convert words into a vector of token counts
vectorizer = CountVectorizer(inputCol="filtered", outputCol="rawFeatures", vocabSize=10000, minDF=2)

# Compute the Inverse Document Frequency (IDF)
idf = IDF(inputCol="rawFeatures", outputCol="features")

# Convert string labels into indexed labels
indexer = StringIndexer(inputCol="label", outputCol="label_index")

# Define a Naive Bayes classifier
nb = NaiveBayes(featuresCol="features", labelCol="label_index", modelType="multinomial", smoothing=2.5)

# Combine all steps into a single ML pipeline
pipeline = Pipeline(stages=[tokenizer, stopwords_remover, vectorizer, idf, indexer, nb])


In [48]:
# ------------------------------------------------------------
# 5. Train-Test Split and Training
# ------------------------------------------------------------

print("\n🚂 Starting training...")

# Split the balanced dataset into training (90%) and test (10%)
train_data, test_data = balanced_df.randomSplit([0.9, 0.1], seed=42)

# Train the model using the pipeline
model = pipeline.fit(train_data)



🚂 Starting training...


In [49]:
# ------------------------------------------------------------
# 6. Prediction
# ------------------------------------------------------------

print("\n🔎 Running prediction on test data...")

# Generate predictions using the trained model
predictions = model.transform(test_data)

# Convert predictions to Pandas DataFrame for evaluation
pdf = predictions.select("label_index", "prediction").toPandas()

# Ground truth and predictions
y_true = pdf["label_index"].astype(int)
y_pred = pdf["prediction"].astype(int)

# Optional: Map label indices to class names
label_names = {0: "negative", 1: "neutral", 2: "positive"}
y_true_named = [label_names[i] for i in y_true]
y_pred_named = [label_names[i] for i in y_pred]

# Evaluate performance using classification report and confusion matrix
from sklearn.metrics import classification_report, confusion_matrix

report = classification_report(y_true_named, y_pred_named, digits=2)
conf_matrix = confusion_matrix(y_true_named, y_pred_named, labels=["negative", "neutral", "positive"])

# Print results
print("\n=== Classification Report ===")
print(report)

print("=== Confusion Matrix ===")
print(conf_matrix)



🔎 Running prediction on test data...

=== Classification Report ===
              precision    recall  f1-score   support

    negative       0.80      0.81      0.81       131
     neutral       0.79      0.79      0.79       136
    positive       0.90      0.90      0.90       143

    accuracy                           0.83       410
   macro avg       0.83      0.83      0.83       410
weighted avg       0.83      0.83      0.83       410

=== Confusion Matrix ===
[[106  18   7]
 [ 22 107   7]
 [  4  11 128]]
