In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_unixtime, to_date, date_add, date_sub, collect_list, count, lit

spark = SparkSession.builder.appName("Item Co-review Analysis").getOrCreate()

# Load the dataset
reviews = spark.read.json("spark-public/datasets/reviews/reviews-5.json")
reviews.show()


+----------+--------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|      asin| helpful|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|             summary|unixReviewTime|
+----------+--------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|5555991584|  [3, 3]|    5.0|It's hard to beli...|09 12, 2006|A3EBHHCZO6V2A4|Amaranth "music fan"|Enya's last great...|    1158019200|
|5555991584|  [0, 0]|    5.0|A clasically-styl...| 06 3, 2001| AZPWAXJG9OJXV|           bethtexas|Enya at her most ...|     991526400|
|5555991584|  [2, 2]|    5.0|I never thought E...|07 14, 2003|A38IRL0X2T4DPF|         bob turnley|     The best so far|    1058140800|
|5555991584|  [1, 1]|    5.0|This is the third...| 05 3, 2000|A22IK3I6U76GX0|               Calle|Ireland produces ...|     957312000|
|5555991584|  [1, 1]|    4.0|Enya, despite bei...|01 17

In [3]:
from pyspark.sql.functions import col, to_date
reviews = reviews.withColumn("reviewDate", to_date(from_unixtime("unixReviewTime")))
reviews = reviews.withColumn("windowStart", date_sub(col("reviewDate"), 1))
reviews = reviews.withColumn("windowEnd", date_add(col("reviewDate"), 1))
reviews.show()

+----------+--------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+----------+-----------+----------+
|      asin| helpful|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|             summary|unixReviewTime|reviewDate|windowStart| windowEnd|
+----------+--------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+----------+-----------+----------+
|5555991584|  [3, 3]|    5.0|It's hard to beli...|09 12, 2006|A3EBHHCZO6V2A4|Amaranth "music fan"|Enya's last great...|    1158019200|2006-09-12| 2006-09-11|2006-09-13|
|5555991584|  [0, 0]|    5.0|A clasically-styl...| 06 3, 2001| AZPWAXJG9OJXV|           bethtexas|Enya at her most ...|     991526400|2001-06-03| 2001-06-02|2001-06-04|
|5555991584|  [2, 2]|    5.0|I never thought E...|07 14, 2003|A38IRL0X2T4DPF|         bob turnley|     The best so far|    1058140800|2003-07-14| 2003-07-1

In [4]:
related_reviews = reviews.alias("df1").join(
    reviews.alias("df2"),
    (col("df2.reviewDate") >= col("df1.windowStart")) &
    (col("df2.reviewDate") <= col("df1.windowEnd")) &
    (col("df1.asin") != col("df2.asin")),
    "inner"
).select(col("df1.asin"), col("df2.asin").alias("related_asin"))

In [18]:
output_df = related_reviews.groupBy("asin").agg(
    collect_list("related_asin").alias("other")
).orderBy("asin")
output_df.show()

+----------+--------------------+
|      asin|               other|
+----------+--------------------+
|5555991584|[B000000509, B000...|
|B0000000ZW|[B00000016W, B000...|
|B00000016T|[B000000OUU, B000...|
|B00000016W|[B00000051K, B000...|
|B00000017R|[B00000064F, B000...|
|B0000001P4|[B000000Y2R, B000...|
|B0000002HZ|[B0000004VW, B000...|
|B0000002J9|[5555991584, B000...|
|B0000002JR|[5555991584, B000...|
|B0000002ME|[B0000004TW, B000...|
|B0000002O5|[B00000064F, B000...|
|B0000003S0|[B0000024SN, B000...|
|B0000004TW|[B000000OMB, B000...|
|B0000004UM|[B00000017R, B000...|
|B0000004UO|[B000001A6N, B000...|
|B0000004UU|[B0000003S0, B000...|
|B0000004VK|[B000000OUU, B000...|
|B0000004VN|[B0000004VK, B000...|
|B0000004VW|[B0000004VK, B000...|
|B0000004X1|[B00000017R, B000...|
+----------+--------------------+
only showing top 20 rows



In [20]:
from pyspark.sql.functions import collect_list, slice

result = related_reviews.groupBy("df1.asin").agg(
    collect_list("related_asin").alias("other")
).withColumn("other", slice(col("other"), 1, 10))

result.show(truncate=False)

+----------+------------------------------------------------------------------------------------------------------------------------+
|asin      |other                                                                                                                   |
+----------+------------------------------------------------------------------------------------------------------------------------+
|B0000004UU|[B0000003S0, B0000004UO, B000001FUF, B0000009UT, B000001EG6, B000001FFJ, B0000025CY, B0000025OA, B000000I2R, B000000OUJ]|
|B00000053G|[B00000064G, B000000I0G, B000001F4X, B0000024I7, B0000024ZF, B00000055E, B00000055E, B000000WHB, B000000WJA, B000001EDN]|
|B000000541|[B00000054A, B00000054A, B000000Y5M, B000001A9C, B000001F5Y, B00000053X, B00000053X, B000000I0H, B000000OYK, B000001EUP]|
|B0000009QT|[B000001FQI, B000001Y15, B0000024MU, B0000025FJ, B000000W6X, B0000011XM, B0000013GT, B0000025DR, B0000025FJ, B000000YIQ]|
|B0000009UX|[B00000017R, B000001F38, B00000055E, B00000163G, B

In [5]:
from pyspark.sql.functions import col, from_unixtime, to_date, date_add, date_sub, collect_list, struct, slice, expr

related_reviews_counted = related_reviews.groupBy("df1.asin", "related_asin").count()

# Aggregate the results and limit to 10 items per asin
result = related_reviews_counted.groupBy("asin").agg(
    slice(
        expr("collect_list(struct(related_asin, count))"),
        1, 10
    ).alias("other")
)
result.show()

+----------+--------------------+
|      asin|               other|
+----------+--------------------+
|B0000004UU|[{B000001EG6, 1},...|
|B0000004X1|[{B0000024UX, 1},...|
|B00000053G|[{B000001A5X, 1},...|
|B000000541|[{B000001EUP, 1},...|
|B0000009QT|[{B0000024MU, 2},...|
|B0000009UX|[{B000001FKJ, 1},...|
|B0000009V1|[{B000001ZCS, 1},...|
|B000000EDW|[{B000001G07, 1},...|
|B000000HRP|[{B000000VC2, 1},...|
|B000000I0D|[{B000001F38, 1},...|
|B000000I0H|[{B0000025DR, 1},...|
|B000000OOE|[{B000001G0E, 1},...|
|B000000OSL|[{B00000253N, 1},...|
|B000000OUJ|[{B000000WJA, 1},...|
|B000000OWM|[{B000001E58, 1},...|
|B000000OXD|[{B000002KZQ, 1},...|
|B000000W2Z|[{B0000013GT, 1},...|
|B000000YIQ|[{B0000004UM, 2},...|
|B0000011P7|[{B000001EGA, 1},...|
|B000001A9C|[{B000000OUU, 1},...|
+----------+--------------------+
only showing top 20 rows



In [6]:
js = result.toJSON()

In [10]:
result_list = result.collect()
result_dict = [row.asDict() for row in result_list]

# Write the result to a JSON file
import json
with open("result.json", "w") as f:
    json.dump(result_dict, f, indent=2)