In [88]:
# ============================================
# Noelle's Notebook: Clean + Check
# br_reviews (Reviews) & br_payments (Payments)
# ============================================

# Install translation library (only runs once)

%pip install textblob
%pip install deep-translator


from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import StructType, StructField, StringType
import pandas as pd
from textblob import TextBlob
from deep_translator import GoogleTranslator

StatementMeta(, cb2416a4-ed52-4f7a-a8ef-e8fb4a996b45, 119, Finished, Available, Finished)


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.



In [89]:
# Start Spark session
spark = SparkSession.builder.appName("NoelleCleaningEDA").getOrCreate()

StatementMeta(, cb2416a4-ed52-4f7a-a8ef-e8fb4a996b45, 121, Finished, Available, Finished)

In [90]:

# ============================================
# PART 1: CLEAN REVIEWS
# ============================================

# Load existing table
reviews_df = spark.read.table("br_reviews")
reviews_df.printSchema()
reviews_df.show(5)
reviewCount = reviews_df.count()
print("Total reviews:", reviewCount)

StatementMeta(, cb2416a4-ed52-4f7a-a8ef-e8fb4a996b45, 122, Finished, Available, Finished)

root
 |-- review_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- review_score: string (nullable = true)
 |-- review_comment_title: string (nullable = true)
 |-- review_comment_message: string (nullable = true)
 |-- review_creation_date: string (nullable = true)
 |-- review_answer_timestamp: string (nullable = true)

+--------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+
|           review_id|            order_id|review_score|review_comment_title|review_comment_message|review_creation_date|review_answer_timestamp|
+--------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+
|8278360118b83106a...|0558d3449b449e4da...|           4|                   7|                  NULL| 2018-07-11 00:00:00|    2018-07-14 15:22:36|
|3a0fd124d55874a20...|825318ce17c91088b...|           1|                

In [91]:
# remove duplicates
reviews_df = reviews_df.dropDuplicates(["review_id"])
# filter out invalid review scores
reviews_df = reviews_df.filter(F.col("review_score").between(1,5))
# add new column for yes or no review comment
reviews_df = reviews_df.withColumn("review_comment_present", F.when(F.col("review_comment_message").isNull(), "No").otherwise("Yes"))
reviews_df = reviews_df.withColumn("review_title_present", F.when(F.col("review_comment_title").isNull(), "No").otherwise("Yes"))
# check if review comment and title are both yes or no, count the rest as partial
reviews_df = reviews_df.withColumn("review_complete", 
                                   F.when((F.col("review_comment_present") == "Yes") & (F.col("review_title_present") == "Yes"), "Yes")
                                   .when((F.col("review_comment_present") == "No") & (F.col("review_title_present") == "No"), "No")
                                   .otherwise("Partial"))
# 
print("✅ Cleaned reviews rows:", reviews_df.count())
print("Removed ", reviewCount - reviews_df.count(), " rows with duplicate review_id")
reviews_df.printSchema()
reviews_df.show(5)

StatementMeta(, cb2416a4-ed52-4f7a-a8ef-e8fb4a996b45, 123, Finished, Available, Finished)

✅ Cleaned reviews rows: 98410
Removed  5752  rows with duplicate review_id
root
 |-- review_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- review_score: string (nullable = true)
 |-- review_comment_title: string (nullable = true)
 |-- review_comment_message: string (nullable = true)
 |-- review_creation_date: string (nullable = true)
 |-- review_answer_timestamp: string (nullable = true)
 |-- review_comment_present: string (nullable = false)
 |-- review_title_present: string (nullable = false)
 |-- review_complete: string (nullable = false)

+--------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+----------------------+--------------------+---------------+
|           review_id|            order_id|review_score|review_comment_title|review_comment_message|review_creation_date|review_answer_timestamp|review_comment_present|review_title_present|review_complete|
+------------

In [None]:
# Define schema for combined output
import time


schema = StructType([
    StructField("review_comment_message_en", StringType(), True),
    StructField("review_sentiment", StringType(), True),
    StructField("translation_status", StringType(), True)
])

@pandas_udf(schema)
def translate_and_sentiment(text_series: pd.Series) -> pd.DataFrame:
    text_series = text_series.fillna("")
    
    translated_list = []
    sentiment_list = []
    status_list = []

    for text in text_series:
        if text.strip() == "":
            translated_list.append("")
            sentiment_list.append("Neutral")
            status_list.append("empty")
        else:
            # Translation
            try:
                translated_text = GoogleTranslator(source='pt', target='en').translate(text)
                status_list.append("success")
                time.sleep(0.2)  # ~5 requests/sec max
            except Exception as e:
                translated_text = text  # fallback to original
                status_list.append(f"failed: {str(e)}")

            translated_list.append(translated_text)

            # Sentiment analysis
            try:
                polarity = TextBlob(translated_text).sentiment.polarity
                if polarity > 0:
                    sentiment_list.append("Positive")
                elif polarity < 0:
                    sentiment_list.append("Negative")
                else:
                    sentiment_list.append("Neutral")
            except Exception:
                sentiment_list.append("Neutral")

    return pd.DataFrame({
        "review_comment_message_en": translated_list,
        "review_sentiment": sentiment_list,
        "translation_status": status_list
    })

# Apply UDF
reviews_df = reviews_df.withColumn(
    "translation_and_sentiment",
    translate_and_sentiment(col("review_comment_message"))
)

# Split struct into separate columns
reviews_df = reviews_df.withColumn("review_comment_message_en", col("translation_and_sentiment.review_comment_message_en")) \
                       .withColumn("review_sentiment", col("translation_and_sentiment.review_sentiment")) \
                       .withColumn("translation_status", col("translation_and_sentiment.translation_status")) \
                       .drop("translation_and_sentiment")

# Show sample
reviews_df.select("review_comment_message", "review_comment_message_en", "review_sentiment", "translation_status").show(5, truncate=False)

reviews_df = reviews_df.cache()  # caches the computed column in memory
reviews_df.count()  # triggers computation

StatementMeta(, cb2416a4-ed52-4f7a-a8ef-e8fb4a996b45, 127, Finished, Available, Finished)

+--------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------+----------------+------------------+
|review_comment_message                                                                                                                |review_comment_message_en                                                                                                                       |review_sentiment|translation_status|
+--------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------+----------------+------------------+
|NULL                                         

98410

In [None]:
# print distribution of scores, with review completeness
reviews_df.groupBy("review_score", "review_complete").count().orderBy("review_score").show()

# print distribution of sentiments
reviews_df = reviews_df.repartition("review_sentiment")
reviews_df.groupBy("review_sentiment").count().show()

StatementMeta(, cb2416a4-ed52-4f7a-a8ef-e8fb4a996b45, 128, Finished, Available, Finished)

+------------+---------------+-----+
|review_score|review_complete|count|
+------------+---------------+-----+
|           1|             No| 2559|
|           1|            Yes| 1771|
|           1|        Partial| 6952|
|           2|        Partial| 1686|
|           2|             No|  976|
|           2|            Yes|  452|
|           3|            Yes|  733|
|           3|        Partial| 2882|
|           3|             No| 4482|
|           4|        Partial| 4811|
|           4|            Yes| 1430|
|           4|             No|12766|
|           5|        Partial|16260|
|           5|             No|35238|
|           5|            Yes| 5412|
+------------+---------------+-----+

+----------------+-----+
|review_sentiment|count|
+----------------+-----+
|        Positive| 4841|
|         Neutral|92615|
|        Negative|  954|
+----------------+-----+



In [97]:
# ---- Save ----
reviews_df.write.mode("overwrite").format("delta").saveAsTable("lh_silver_olist.sl_review")

# ---- Check & EDA ----
reviews_silver = spark.read.table("lh_silver_olist.sl_review")
reviews_silver.printSchema()
reviews_silver.show(5, truncate=False)
print("✅ Silver reviews rows:", reviews_silver.count())

StatementMeta(, cb2416a4-ed52-4f7a-a8ef-e8fb4a996b45, 129, Finished, Available, Finished)

root
 |-- review_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- review_score: string (nullable = true)
 |-- review_comment_title: string (nullable = true)
 |-- review_comment_message: string (nullable = true)
 |-- review_creation_date: string (nullable = true)
 |-- review_answer_timestamp: string (nullable = true)
 |-- review_comment_present: string (nullable = true)
 |-- review_title_present: string (nullable = true)
 |-- review_complete: string (nullable = true)
 |-- review_comment_message_en: string (nullable = true)
 |-- review_sentiment: string (nullable = true)
 |-- translation_status: string (nullable = true)

+--------------------------------+--------------------------------+------------+--------------------+-------------------------------------------------------------------------------------------+--------------------+-----------------------+----------------------+--------------------+---------------+--------------------------------------------------