## Initialize SparkSession

In [2]:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ProductDailyMetrics") \
    .getOrCreate()


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/19 02:30:40 WARN Utils: Your hostname, Kavanas-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 10.0.0.58 instead (on interface en0)
25/11/19 02:30:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/19 02:30:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/19 02:30:44 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/11/19 02:30:44 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/11/19 02:30:44 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


### Load Silver clean data

In [6]:

silver_path = "/Users/kavanamanvi/Desktop/AmazonReviews/processed/silver/reviews_clean"
df = spark.read.parquet(silver_path)

df.show(5)
df.printSchema()


[Stage 1:>                                                          (0 + 1) / 1]

+--------------------+----------+------+-------------+--------------------+
|             user_id|      asin|rating|    timestamp|          event_time|
+--------------------+----------+------+-------------+--------------------+
|AG7FJJNE73SLYFJAY...|B072HX2DV8|   2.0|1509154387973|+49793-03-13 11:1...|
|AG7FJJNE73SLYFJAY...|B002I0JB6E|   1.0|1324856968000|+43953-01-17 10:2...|
|AELASOADJU4SUNNTF...|B084DDDNRP|   5.0|1613751527651|+53107-10-02 12:1...|
|AELASOADJU4SUNNTF...|B07GTT1Q92|   5.0|1555365983227|+51257-08-01 21:2...|
|AHIWJKGSBL5XAQKKS...|B07CZZC3GP|   5.0|1551407187448|+51132-02-20 10:5...|
+--------------------+----------+------+-------------+--------------------+
only showing top 5 rows
root
 |-- user_id: string (nullable = true)
 |-- asin: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- event_time: timestamp (nullable = true)



                                                                                

### Add a date column

In [9]:
from pyspark.sql.functions import to_date

df_with_date = df.withColumn("date", to_date("event_time"))
df_with_date.show(5)


+--------------------+----------+------+-------------+--------------------+------------+
|             user_id|      asin|rating|    timestamp|          event_time|        date|
+--------------------+----------+------+-------------+--------------------+------------+
|AG7FJJNE73SLYFJAY...|B072HX2DV8|   2.0|1509154387973|+49793-03-13 11:1...|+49793-03-13|
|AG7FJJNE73SLYFJAY...|B002I0JB6E|   1.0|1324856968000|+43953-01-17 10:2...|+43953-01-17|
|AELASOADJU4SUNNTF...|B084DDDNRP|   5.0|1613751527651|+53107-10-02 12:1...|+53107-10-02|
|AELASOADJU4SUNNTF...|B07GTT1Q92|   5.0|1555365983227|+51257-08-01 21:2...|+51257-08-01|
|AHIWJKGSBL5XAQKKS...|B07CZZC3GP|   5.0|1551407187448|+51132-02-20 10:5...|+51132-02-20|
+--------------------+----------+------+-------------+--------------------+------------+
only showing top 5 rows


## Prepare Aggregations

We need metrics per product per day.

### Metrics:

avg rating

review count

verified count (if present)

helpful vote sum (if present)

rating distribution

min/max

earliest/latest review date

In [12]:
# Import functions needed for aggregation
from pyspark.sql.functions import (
    avg, count, sum as spark_sum, min, max,
    countDistinct, expr, first, last
)


In [22]:
#Group by asin, date and compute aggregates


In [17]:
from pyspark.sql import functions as F

daily_metrics = df_with_date.groupBy("asin", "date").agg(
    F.avg("rating").alias("avg_rating"),
    F.count("*").alias("review_count"),
    F.min("rating").alias("min_rating"),
    F.max("rating").alias("max_rating"),

    F.sum((F.col("rating") == 1).cast("int")).alias("rating_1_count"),
    F.sum((F.col("rating") == 2).cast("int")).alias("rating_2_count"),
    F.sum((F.col("rating") == 3).cast("int")).alias("rating_3_count"),
    F.sum((F.col("rating") == 4).cast("int")).alias("rating_4_count"),
    F.sum((F.col("rating") == 5).cast("int")).alias("rating_5_count"),

    F.min("event_time").alias("first_review_date"),
    F.max("event_time").alias("last_review_date"),
)


## Write the gold dataset partitioned by date

In [21]:
daily_metrics.printSchema()
daily_metrics.show(10)

root
 |-- asin: string (nullable = true)
 |-- date: date (nullable = true)
 |-- avg_rating: double (nullable = true)
 |-- review_count: long (nullable = false)
 |-- min_rating: double (nullable = true)
 |-- max_rating: double (nullable = true)
 |-- rating_1_count: long (nullable = true)
 |-- rating_2_count: long (nullable = true)
 |-- rating_3_count: long (nullable = true)
 |-- rating_4_count: long (nullable = true)
 |-- rating_5_count: long (nullable = true)
 |-- first_review_date: timestamp (nullable = true)
 |-- last_review_date: timestamp (nullable = true)



[Stage 5:>                                                          (0 + 1) / 1]

+----------+------------+----------+------------+----------+----------+--------------+--------------+--------------+--------------+--------------+--------------------+--------------------+
|      asin|        date|avg_rating|review_count|min_rating|max_rating|rating_1_count|rating_2_count|rating_3_count|rating_4_count|rating_5_count|   first_review_date|    last_review_date|
+----------+------------+----------+------------+----------+----------+--------------+--------------+--------------+--------------+--------------+--------------------+--------------------+
|B000S0C2UI|+44721-04-14|       5.0|           1|       5.0|       5.0|             0|             0|             0|             0|             1|+44721-04-14 06:5...|+44721-04-14 06:5...|
|B0BW8N6X48|+55185-06-09|       1.0|           1|       1.0|       1.0|             1|             0|             0|             0|             0|+55185-06-09 09:1...|+55185-06-09 09:1...|
|B016K12406|+53753-07-06|       5.0|           1|      

                                                                                

In [25]:
output_path = "/Users/kavanamanvi/Desktop/AmazonReviews/processed/gold/product_daily_metrics"

daily_metrics.write \
    .mode("overwrite") \
    .partitionBy("date") \
    .parquet(output_path)


ERROR:root:KeyboardInterrupt while sending command.                 (0 + 4) / 4]
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/anaconda3/lib/python3.12/site-packages/py4j/clientserver.py", line 535, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/anaconda3/lib/python3.12/socket.py", line 708, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 