<a href="https://colab.research.google.com/github/tanvimagdum/Customer-Feedback-Analysis-and-Summarization/blob/main/MiniProject_smallDataset.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("datafiniti/consumer-reviews-of-amazon-products")

print("Path to dataset files:", path)

Downloading from https://www.kaggle.com/api/v1/datasets/download/datafiniti/consumer-reviews-of-amazon-products?dataset_version_number=5...


100%|██████████| 16.3M/16.3M [00:00<00:00, 66.5MB/s]

Extracting files...





Path to dataset files: /root/.cache/kagglehub/datasets/datafiniti/consumer-reviews-of-amazon-products/versions/5


In [7]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AmazonReviewsAnalysis").config("spark.driver.memory", "16g").getOrCreate()
df = spark.read.csv(path, sep=",", header=True, inferSchema=True)
df.printSchema()
df.show(5)

root
 |-- id: string (nullable = true)
 |-- dateAdded: string (nullable = true)
 |-- dateUpdated: string (nullable = true)
 |-- name: string (nullable = true)
 |-- asins: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- primaryCategories: string (nullable = true)
 |-- imageURLs: string (nullable = true)
 |-- keys: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- manufacturerNumber: string (nullable = true)
 |-- reviews.date: string (nullable = true)
 |-- reviews.dateSeen: string (nullable = true)
 |-- reviews.didPurchase: string (nullable = true)
 |-- reviews.doRecommend: string (nullable = true)
 |-- reviews.id: string (nullable = true)
 |-- reviews.numHelpful: string (nullable = true)
 |-- reviews.rating: string (nullable = true)
 |-- reviews.sourceURLs: string (nullable = true)
 |-- reviews.text: string (nullable = true)
 |-- reviews.title: string (nullable = true)
 |-- reviews.username: string (nul

In [5]:
# Data Preprocessing

from pyspark.sql.functions import col, lower, regexp_replace, length, length, trim, when, size
from pyspark.ml.feature import Tokenizer, StopWordsRemover

# Convert text to lowercase and remove punctuation
df_cleaned = df.withColumn("`reviews.text`", lower(col("`reviews.text`")))
df_cleaned = df_cleaned.withColumn("`reviews.text`", regexp_replace(col("`reviews.text`"), "[^a-zA-Z0-9\\s]", ""))

# Tokenization
tokenizer = Tokenizer(inputCol="`reviews.text`", outputCol="tokens")
df_tokenized = tokenizer.transform(df_cleaned)

# Remove stop words
remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")
df_filtered = remover.transform(df_tokenized)

# Filter Reviews by Ratings
df_positive = df_filtered.filter(col("`reviews.rating`") >= 4)
df_negative = df_filtered.filter(col("`reviews.rating`") <= 2)

# Filter out NULL and empty values in review_body
df_filtered = df_filtered.filter(col("`reviews.text`").isNotNull())
df_filtered = df_filtered.filter(length(trim(col("`reviews.text`"))) > 0)

# Add Sentiment Column
df_filtered = df_filtered.withColumn(
    "sentiment",
    when(col("`reviews.rating`") >= 4, "positive")
    .when(col("`reviews.rating`") <= 2, "negative")
    .otherwise("neutral")
)

# Remove rows with empty filtered_tokens
df_filtered = df_filtered.filter(size(col("filtered_tokens")) > 0)


In [8]:
from pyspark.sql.functions import col, lower, regexp_replace, when, size, concat_ws, round
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Data Loading (Assuming df_cleaned is already created as per the provided script)

# Step 1: Split Dataset
train_data, test_data = df_filtered.randomSplit([0.8, 0.2], seed=42)  # 80-20 split

# Step 2: Fit CountVectorizer on Training Data
cv = CountVectorizer(inputCol="filtered_tokens", outputCol="key_phrases", vocabSize=50)
cv_model = cv.fit(train_data)

# Transform both training and testing data
train_transformed = cv_model.transform(train_data)
test_transformed = cv_model.transform(test_data)

# Step 3: Train a Classifier
# Add numeric labels for sentiment
train_transformed = train_transformed.withColumn(
    "label",
    when(col("sentiment") == "positive", 1.0)
    .when(col("sentiment") == "negative", 0.0)
    .otherwise(2.0)  # Neutral
)

test_transformed = test_transformed.withColumn(
    "label",
    when(col("sentiment") == "positive", 1.0)
    .when(col("sentiment") == "negative", 0.0)
    .otherwise(2.0)  # Neutral
)

# Train a Logistic Regression Model
lr = LogisticRegression(featuresCol="key_phrases", labelCol="label")
lr_model = lr.fit(train_transformed)

# Step 4: Make Predictions
predictions = lr_model.transform(test_transformed)

# Step 5: Evaluate the Model
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions)

# Print Predictions and Accuracy
predictions.select("primaryCategories", "sentiment", "prediction").show(10, truncate=False)
print(f"Model Accuracy: {accuracy:.2f}")


+-----------------+---------+----------+
|primaryCategories|sentiment|prediction|
+-----------------+---------+----------+
|Electronics,Media|positive |1.0       |
|Electronics      |positive |1.0       |
|Electronics      |positive |1.0       |
|Electronics      |negative |1.0       |
|Electronics      |positive |2.0       |
|Health & Beauty  |positive |1.0       |
|Health & Beauty  |positive |1.0       |
|Health & Beauty  |positive |2.0       |
|Health & Beauty  |negative |1.0       |
|Health & Beauty  |positive |1.0       |
+-----------------+---------+----------+
only showing top 10 rows

Model Accuracy: 0.88


In [9]:
print(f"Model Accuracy: {accuracy:.2f}")

Model Accuracy: 0.88


In [10]:
from pyspark.sql.functions import concat_ws

# Combine tokens back to sentences
df_filtered = df_filtered.withColumn("summary", concat_ws(" ", col("filtered_tokens")))

# Summaries for each sentiment
df_positive_summary = df_filtered.filter(col("sentiment") == "positive").select("summary").show(10, truncate=False)
df_negative_summary = df_filtered.filter(col("sentiment") == "negative").select("summary").show(10, truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|summary                                                                                                                                                             |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|bulk always less expensive way go products like                                                                                                                     |
|well duracell price happy.                                                                                                                                          |
|seem work well name brand batteries much better price                                                                                                               

In [12]:
insights = df_filtered.groupBy("primaryCategories", "sentiment").count()
insights.show()

+--------------------+---------+-----+
|   primaryCategories|sentiment|count|
+--------------------+---------+-----+
|         Electronics| negative|  434|
|Computers,Electro...|  neutral|    2|
|   Electronics,Media|  neutral|    3|
|     Health & Beauty| negative| 1146|
|Computers,Electro...| positive|    2|
|Fire Tablets,Comp...| positive|    1|
|Office Supplies,E...|  neutral|   36|
|Office Supplies,E...| positive|  605|
|   Electronics,Media| negative|    5|
|Electronics,Furni...| positive|    2|
|Animals & Pet Sup...|  neutral|    1|
|         Electronics|  neutral|  654|
|     Health & Beauty| positive|10390|
|   Electronics,Media| positive|  201|
|Toys & Games,Elec...| positive| 1529|
|     Health & Beauty|  neutral|  533|
|Fire Tablets,Comp...| negative|   81|
|Computers,Electro...| negative|   13|
|       Home & Garden| positive|    2|
|Animals & Pet Sup...| positive|    5|
+--------------------+---------+-----+
only showing top 20 rows



In [13]:
from pyspark.sql.functions import col, round

# Group by product_category and calculate total reviews
total_reviews = df_filtered.groupBy("primaryCategories").count().withColumnRenamed("count", "total_count")

# Join insights with total_reviews on product_category
insights_with_totals = insights.join(total_reviews, on="primaryCategories", how="inner")

# Add the percentage column
sentiment_stats = insights_with_totals.withColumn(
    "percentage",
    round((col("count") / col("total_count")) * 100, 2)
)

# Show the result
sentiment_stats.show()

+--------------------+---------+-----+-----------+----------+
|   primaryCategories|sentiment|count|total_count|percentage|
+--------------------+---------+-----+-----------+----------+
|Electronics,Furni...| positive|    2|          2|     100.0|
|Office Supplies,E...| negative|   10|        651|      1.54|
|Office Supplies,E...| positive|  605|        651|     92.93|
|Office Supplies,E...|  neutral|   36|        651|      5.53|
|         Electronics| positive|14905|      15993|      93.2|
|         Electronics|  neutral|  654|      15993|      4.09|
|         Electronics| negative|  434|      15993|      2.71|
|       Home & Garden| positive|    2|          2|     100.0|
|   Electronics,Media| positive|  201|        209|     96.17|
|   Electronics,Media| negative|    5|        209|      2.39|
|   Electronics,Media|  neutral|    3|        209|      1.44|
|Toys & Games,Elec...| negative|   54|       1676|      3.22|
|Toys & Games,Elec...|  neutral|   93|       1676|      5.55|
|Toys & 