<a href="https://colab.research.google.com/github/mia-khan/DS5110/blob/MiniProject/DS_5110_Mini_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import kagglehub
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import ArrayType, StringType


# Data Loading and Setup
#=========================================================================================

# Step 1: Download the dataset using kagglehub
dataset_handle = "cynthiarempel/amazon-us-customer-reviews-dataset"
print(f"Downloading dataset: {dataset_handle}")
dataset_path = kagglehub.dataset_download(dataset_handle)
print(f"Dataset downloaded to: {dataset_path}")
#=========================================================================================
# Step 2: Initialize PySpark session
spark = SparkSession.builder \
    .appName("Amazon Customer Reviews Analysis") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()
#=========================================================================================
# Step 3: List all files in the dataset directory

try:
    files = os.listdir(dataset_path)
    print("Files in the dataset directory:", files)
#=========================================================================================
    # Step 4: Process each TSV file
    for file in files:
        if file.endswith(".tsv"):
            file_path = os.path.join(dataset_path, file)
            print(f"Reading TSV file: {file}")
            try:
                # Read the TSV file into a DataFrame
                df = spark.read.csv(file_path, header=True, inferSchema=True, sep='\t')

                # Display the first 4 records
                df.show(4)

                # Stop after processing the first file
                break

            except Exception as e:
                print(f"Error processing file {file}: {e}")
        else:
            print(f"Skipping unsupported file format: {file}")

except Exception as e:
    print(f"An error occurred: {e}")

finally:
    # Keep spark session going
    pass


Downloading dataset: cynthiarempel/amazon-us-customer-reviews-dataset
Resuming download from 53477376 bytes (22444254373 bytes left)...
Resuming download from https://www.kaggle.com/api/v1/datasets/download/cynthiarempel/amazon-us-customer-reviews-dataset?dataset_version_number=9 (53477376/22497731749) bytes left.


100%|██████████| 21.0G/21.0G [17:10<00:00, 21.8MB/s]


Extracting files...
Dataset downloaded to: /root/.cache/kagglehub/datasets/cynthiarempel/amazon-us-customer-reviews-dataset/versions/9
Files in the dataset directory: ['amazon_reviews_us_Major_Appliances_v1_00.tsv', 'amazon_reviews_us_Digital_Video_Games_v1_00.tsv', 'amazon_reviews_us_Video_Games_v1_00.tsv', 'amazon_reviews_us_Digital_Ebook_Purchase_v1_01.tsv', 'amazon_reviews_us_Wireless_v1_00.tsv', 'amazon_reviews_us_Camera_v1_00.tsv', 'amazon_reviews_us_Video_v1_00.tsv', 'amazon_reviews_us_Software_v1_00.tsv', 'amazon_reviews_us_Digital_Video_Download_v1_00.tsv', 'amazon_reviews_us_Office_Products_v1_00.tsv', 'amazon_reviews_us_Grocery_v1_00.tsv', 'amazon_reviews_us_Tools_v1_00.tsv', 'amazon_reviews_multilingual_US_v1_00.tsv', 'amazon_reviews_us_Watches_v1_00.tsv', 'amazon_reviews_us_Sports_v1_00.tsv', 'amazon_reviews_us_Apparel_v1_00.tsv', 'amazon_reviews_us_Baby_v1_00.tsv', 'amazon_reviews_us_Electronics_v1_00.tsv', 'amazon_reviews_us_Pet_Products_v1_00.tsv', 'amazon_reviews_us_Di

In [2]:
# Data Preprocessing
df = df.withColumn("review_body", df["review_body"].cast("string"))
df_clean = df.filter(df['review_body'].isNotNull() & (df['review_body'] != '')) # drop nulls

# Tokenization and Cleaning
regex_tokenizer = RegexTokenizer(inputCol="review_body", outputCol="words", pattern="\\W")
words_data = regex_tokenizer.transform(df_clean)

remover = StopWordsRemover(inputCol="words", outputCol="filtered")
filtered_data = remover.transform(words_data)

vectorizer = CountVectorizer(inputCol="filtered", outputCol="raw_features")
vectorized_model = vectorizer.fit(filtered_data)
vectorized_data = vectorized_model.transform(filtered_data)

idf = IDF(inputCol="raw_features", outputCol="features")
idf_model = idf.fit(vectorized_data)
final_data = idf_model.transform(vectorized_data)

# Filtering
low_ratings = final_data[(final_data['star_rating'] == 1) | (final_data['star_rating'] == 2)]
high_ratings = final_data[(final_data['star_rating'] == 4) | (final_data['star_rating'] == 5)]


In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
# Text Analysis and Processing

# VADER Sentiment Analysis setup
import nltk
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

nltk.download('vader_lexicon')
sia = SentimentIntensityAnalyzer()

# Function for sentiment score
def get_sentiment(text):
    sentiment = sia.polarity_scores(text)
    return sentiment['compound']

# Function to categorize sentiment based on score
def categorize_sentiment(score):
    if score > 0.05:
        return "Positive"
    elif score < -0.05:
        return "Negative"
    else:
        return "Neutral"

# Register the UDF
sentiment_score_udf = udf(get_sentiment, StringType())
sentiment_label_udf = udf(categorize_sentiment, StringType())

# Apply VADER to low_ratings df
low_ratings = low_ratings.withColumn("sentiment_score", sentiment_score_udf(low_ratings["review_body"]))
low_ratings = low_ratings.withColumn("sentiment", sentiment_label_udf(low_ratings["sentiment_score"]))

# Apply VADER to high_ratings df
high_ratings = high_ratings.withColumn("sentiment_score", sentiment_score_udf(high_ratings["review_body"]))
high_ratings = high_ratings.withColumn("sentiment", sentiment_label_udf(high_ratings["sentiment_score"]))

# Key Phrase Extraaction
from pyspark.sql import functions as F

low_ratings = low_ratings.withColumn("keywords", F.array_distinct(F.col("filtered")))
high_ratings = high_ratings.withColumn("keywords", F.array_distinct(F.col("filtered")))

[nltk_data] Downloading package vader_lexicon to /root/nltk_data...


In [7]:
# Summarization
def create_summarized_dataset(df):
    summarized = df.select(
        "product_category",
        "sentiment",
        F.array_join("keywords", ", ").alias("key_phrases"),
        F.array_join(F.slice(F.split(F.col("review_body"), "[.!?]"), 1, 3), ". ").alias("summarized_feedback"),
        "star_rating",
        "helpful_votes"
    )
    return summarized

# Combine ratings and create summary
combined_ratings = low_ratings.union(high_ratings)
summarized_data = create_summarized_dataset(combined_ratings)

# Save as CSV
print("Saving summarized dataset...")
summarized_data.write.mode("overwrite") \
   .option("header", "true") \
   .csv("amazon_reviews_summarized.csv")

# Display sample
print("\n=== Summarized Dataset Sample ===")
summarized_data.show(5, truncate=False)

# Calculate category-level statistics
category_stats = summarized_data.groupBy("product_category") \
   .agg(
       F.count("*").alias("total_reviews"),
       F.avg("star_rating").alias("avg_rating"),
       F.avg(F.when(F.col("sentiment") == "Positive", 1).otherwise(0)).alias("positive_percentage")
   )

print("\n=== Category Statistics ===")
category_stats.show(truncate=False)

Saving summarized dataset...

=== Summarized Dataset Sample ===
+----------------+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------+-----------+-------------+
|product_category|sentiment|key_phrases                                                                                                                                                                                         |summarized_feedback                                                                                                                |star_rating|helpful_votes|
+----------------+---------+--------------------------------------------------------------------------------------------------------------------------------------------

In [8]:
final_data = final_data.withColumn(
    "sentiment",
    F.when(F.col("star_rating") >= 4, "Positive")
    .when(F.col("star_rating") <= 2, "Negative")
    .otherwise("Neutral")
)

print("Sample of data with sentiment:")
final_data.select("star_rating", "sentiment").show(5)

# Check available columns
print("\nAvailable columns:")
print(final_data.columns)

Sample of data with sentiment:
+-----------+---------+
|star_rating|sentiment|
+-----------+---------+
|          5| Positive|
|          5| Positive|
|          5| Positive|
|          5| Positive|
|          5| Positive|
+-----------+---------+
only showing top 5 rows


Available columns:
['marketplace', 'customer_id', 'review_id', 'product_id', 'product_parent', 'product_title', 'product_category', 'star_rating', 'helpful_votes', 'total_votes', 'vine', 'verified_purchase', 'review_headline', 'review_body', 'review_date', 'words', 'filtered', 'raw_features', 'features', 'sentiment']


In [9]:
def analyze_category_patterns(df, category):
    """Analyze specific patterns and issues for a given category"""
    category_data = df.filter(F.col("product_category") == category)

    # Negative issues analysis
    issues = category_data.filter(F.col("sentiment") == "Negative")\
        .select(F.explode("filtered").alias("issue"))\
        .groupBy("issue")\
        .count()\
        .orderBy(F.desc("count"))\
        .limit(5)

    # Positive highlights analysis
    highlights = category_data.filter(F.col("sentiment") == "Positive")\
        .select(F.explode("filtered").alias("highlight"))\
        .groupBy("highlight")\
        .count()\
        .orderBy(F.desc("count"))\
        .limit(5)

    return issues, highlights

# Analyze patterns for each category
categories = final_data.select("product_category").distinct().collect()
category_insights = {}

for row in categories:
    category = row["product_category"]
    print(f"Processing category: {category}")
    issues, highlights = analyze_category_patterns(final_data, category)
    category_insights[category] = {
        "top_issues": issues.collect(),
        "top_highlights": highlights.collect()
    }

Processing category: Major Appliances


In [10]:
# Aggregation and Insights
total_reviews = final_data.count()

statistics = final_data.agg(
    F.avg("star_rating").alias("average_rating"),
    F.stddev("star_rating").alias("rating_stddev"),
    F.sum("helpful_votes").alias("total_helpful_votes"),
    F.avg("helpful_votes").alias("avg_helpful_votes")
)

# Sentiment distribution
sentiment_dist = final_data.groupBy("sentiment") \
    .agg(F.count("*").alias("count")) \
    .withColumn("percentage", F.round(F.col("count") / total_reviews * 100, 2))

print("=== Overall Statistics ===")
statistics.show()

print("\n=== Sentiment Distribution ===")
sentiment_dist.show()

=== Overall Statistics ===
+-----------------+------------------+-------------------+-----------------+
|   average_rating|     rating_stddev|total_helpful_votes|avg_helpful_votes|
+-----------------+------------------+-------------------+-----------------+
|3.716363223515812|1.6033305005192597|             420499|4.340052431673685|
+-----------------+------------------+-------------------+-----------------+


=== Sentiment Distribution ===
+---------+-----+----------+
|sentiment|count|percentage|
+---------+-----+----------+
| Positive|64858|     66.94|
|  Neutral| 6717|      6.93|
| Negative|25313|     26.13|
+---------+-----+----------+



In [11]:
# First, create the summarized data
summarized_data = final_data.groupBy("product_category", "sentiment")\
    .agg(
        F.count("*").alias("review_count"),
        F.avg("star_rating").alias("avg_rating"),
        F.avg("helpful_votes").alias("avg_helpful_votes")
    )

def generate_insights_report(summarized_data, category_insights):
    """Generate a comprehensive insights report with proper formatting of metrics"""
    report = """Customer Feedback Analysis Report
===============================

Overall Statistics:"""

    # Overall statistics
    total_reviews = summarized_data.agg(F.sum("review_count")).collect()[0][0]
    avg_rating = summarized_data.agg(F.avg("avg_rating")).collect()[0][0]

    report += f"""

Total Reviews Analyzed: {total_reviews:,}
Average Rating: {avg_rating:.2f}

Key Findings by Category:"""

    # Category-specific insights
    for category in category_insights.keys():
        report += f"\n\n{category}:"

        # Get sentiment distribution for this category
        category_data = summarized_data.filter(F.col("product_category") == category)
        total_category_reviews = category_data.agg(F.sum("review_count")).collect()[0][0]

        report += f"\nTotal Reviews: {total_category_reviews:,}"

        report += "\n  Top Issues:"
        for issue in category_insights[category]["top_issues"]:
            # Properly access the count value
            report += f"\n    - {issue['issue']}: {issue['count']:,} mentions ({(issue['count']/total_category_reviews*100):.1f}%)"

        report += "\n  Top Highlights:"
        for highlight in category_insights[category]["top_highlights"]:
            # Properly access the count value
            report += f"\n    - {highlight['highlight']}: {highlight['count']:,} mentions ({(highlight['count']/total_category_reviews*100):.1f}%)"

    report += """

Analysis Summary:
----------------
1. Review Volume Analysis:
   - Identified categories with highest review counts
   - Tracked review distribution across sentiment categories

2. Sentiment Patterns:
   - Analyzed positive vs negative sentiment ratios
   - Identified common themes in positive and negative feedback

3. Key Issues Identified:
   - Documented most frequent customer complaints
   - Tracked recurring product-specific concerns

4. Success Factors:
   - Highlighted features receiving positive feedback
   - Identified strongest product attributes

Actionable Recommendations:
-------------------------
1. Product Improvements:
   - Address frequently mentioned issues in each category
   - Focus on recurring technical or quality concerns

2. Marketing Opportunities:
   - Leverage positive aspects identified in reviews
   - Highlight well-received product features

3. Category Focus:
   - Prioritize categories with lower satisfaction scores
   - Implement targeted improvements for underperforming products

4. Monitoring Strategy:
   - Continue tracking trending topics
   - Establish regular review analysis schedule

5. Customer Service:
   - Address common service-related complaints
   - Enhance support for frequently mentioned issues
"""

    return report

# Function to analyze patterns with proper count handling
def analyze_category_patterns(df, category):
    """Analyze specific patterns and issues for a given category with improved count handling"""
    category_data = df.filter(F.col("product_category") == category)

    # Negative issues analysis
    issues = category_data.filter(F.col("sentiment") == "Negative")\
        .select(F.explode("filtered").alias("issue"))\
        .groupBy("issue")\
        .count()\
        .orderBy(F.desc("count"))\
        .limit(5)\
        .collect()  # Collect the results

    # Positive highlights analysis
    highlights = category_data.filter(F.col("sentiment") == "Positive")\
        .select(F.explode("filtered").alias("highlight"))\
        .groupBy("highlight")\
        .count()\
        .orderBy(F.desc("count"))\
        .limit(5)\
        .collect()  # Collect the results

    return issues, highlights


In [12]:
# Generate and save report
insights_report = generate_insights_report(summarized_data, category_insights)
with open("customer_feedback_insights.txt", "w", encoding='utf-8') as f:
    f.write(insights_report)