In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg

spark_session = SparkSession.builder \
        .appName("Debugging Worker Errors") \
        .master("local[*]") \
        .getOrCreate()

# Get the DF for the CSV file
data = './data/reviews-250.csv'

In [10]:
# ---- DATAFRAME ----
# 1 - Load CSV file as a DataFrame
df = spark_session.read.csv(data, header=True, inferSchema=True)
# Show top 5 rows
df.show(5)
obj = df.collect()

+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+
| Id| ProductId|        UserId|         ProfileName|HelpfulnessNumerator|HelpfulnessDenominator|Score|      Time|             Summary|                Text|
+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+
|  1|B001E4KFG0|A3SGXH7AUHU8GW|          delmartian|                   1|                     1|    5|1303862400|Good Quality Dog ...|I have bought sev...|
|  2|B00813GRG4|A1D87F6ZCVE5NK|              dll pa|                   0|                     0|    1|1346976000|   Not as Advertised|"Product arrived ...|
|  3|B000LQOCH0| ABXLMWJIXXAIN|"Natalia Corres "...|                   1|                     1|    4|1219017600|"""Delight"" says...|"This is a confec...|
|  4|B000UA0QIQ|A395BORC6FGVXV|                Karl|            

In [11]:
# 2 - Filter reviews with a Score of 5
high_rating_df = df.filter(df["Score"] == 5)
high_rating_df.show()

+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+
| Id| ProductId|        UserId|         ProfileName|HelpfulnessNumerator|HelpfulnessDenominator|Score|      Time|             Summary|                Text|
+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+
|  1|B001E4KFG0|A3SGXH7AUHU8GW|          delmartian|                   1|                     1|    5|1303862400|Good Quality Dog ...|I have bought sev...|
|  5|B006K2ZZ7K|A1UQRSCLF8GW1T|"Michael D. Bigha...|                   0|                     0|    5|1350777600|         Great taffy|Great taffy at a ...|
|  7|B006K2ZZ7K|A1SP2KVKFXXRU1|   David C. Sullivan|                   0|                     0|    5|1340150400|Great!  Just as g...|This saltwater ta...|
|  8|B006K2ZZ7K|A3JRGQVEQN31IQ|  Pamela G. Williams|            

In [12]:
# 3 - Select specific columns
selected_columns_df = df.select("ProductId", "Score", "Summary")
selected_columns_df.show(5)
print(selected_columns_df.collect()[0].asDict())

+----------+-----+--------------------+
| ProductId|Score|             Summary|
+----------+-----+--------------------+
|B001E4KFG0|    5|Good Quality Dog ...|
|B00813GRG4|    1|   Not as Advertised|
|B000LQOCH0|    4|"""Delight"" says...|
|B000UA0QIQ|    2|      Cough Medicine|
|B006K2ZZ7K|    5|         Great taffy|
+----------+-----+--------------------+
only showing top 5 rows

{'ProductId': 'B001E4KFG0', 'Score': 5, 'Summary': 'Good Quality Dog Food'}


In [13]:
# 4 - Add HelpfulnessRatio column
df = df.withColumn(
    "HelpfulnessRatio",
    (col("HelpfulnessNumerator") / col("HelpfulnessDenominator")).cast("double")
)
df.show(5)

+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+----------------+
| Id| ProductId|        UserId|         ProfileName|HelpfulnessNumerator|HelpfulnessDenominator|Score|      Time|             Summary|                Text|HelpfulnessRatio|
+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+----------------+
|  1|B001E4KFG0|A3SGXH7AUHU8GW|          delmartian|                   1|                     1|    5|1303862400|Good Quality Dog ...|I have bought sev...|             1.0|
|  2|B00813GRG4|A1D87F6ZCVE5NK|              dll pa|                   0|                     0|    1|1346976000|   Not as Advertised|"Product arrived ...|            NULL|
|  3|B000LQOCH0| ABXLMWJIXXAIN|"Natalia Corres "...|                   1|                     1|    4|1219017600|"""Delight"" says...|"

In [14]:
# 5 - Filter rows with HelpfulnessDenominator > 0
valid_helpfulness_df = df.filter(df["HelpfulnessDenominator"] > 0)
valid_helpfulness_df.show(5)

+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+----------------+
| Id| ProductId|        UserId|         ProfileName|HelpfulnessNumerator|HelpfulnessDenominator|Score|      Time|             Summary|                Text|HelpfulnessRatio|
+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+----------------+
|  1|B001E4KFG0|A3SGXH7AUHU8GW|          delmartian|                   1|                     1|    5|1303862400|Good Quality Dog ...|I have bought sev...|             1.0|
|  3|B000LQOCH0| ABXLMWJIXXAIN|"Natalia Corres "...|                   1|                     1|    4|1219017600|"""Delight"" says...|"This is a confec...|             1.0|
|  4|B000UA0QIQ|A395BORC6FGVXV|                Karl|                   3|                     3|    2|1307923200|      Cough Medicine|I

In [15]:
# 6 - Count distinct UserId values
distinct_users_count = df.select("UserId").distinct().count()
print(f"Distinct Users: {distinct_users_count}")

Distinct Users: 247


In [None]:
# 20 Calculate average Score per ProductId
from pyspark.sql.functions import avg

average_score_df = df.groupBy("ProductId").agg(avg("Score").alias("AverageScore"))
average_score_df.show(5)