In [1]:
import os
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("big-data-project")
    .master("local[*]")
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.driver.extraJavaOptions", "-Djava.security.manager=allow")
    .config("spark.executor.extraJavaOptions", "-Djava.security.manager=allow")
    .getOrCreate()
)

print("Spark version:", spark.version)


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/12 17:34:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark version: 4.0.1


In [2]:
# Load the full Grocery_and_Gourmet_Food JSONL file

input_path = "../Grocery_and_Gourmet_Food.jsonl"
df = spark.read.json(input_path)
df.printSchema()
df.show(5, truncate=False)
df.count()

                                                                                

root
 |-- asin: string (nullable = true)
 |-- helpful_vote: long (nullable = true)
 |-- images: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- attachment_type: string (nullable = true)
 |    |    |-- large_image_url: string (nullable = true)
 |    |    |-- medium_image_url: string (nullable = true)
 |    |    |-- small_image_url: string (nullable = true)
 |-- parent_asin: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- text: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- title: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- verified_purchase: boolean (nullable = true)

+----------+------------+------+-----------+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

14318520

In [3]:
# 2) Add row numbers so we can split into 3 parts

from pyspark.sql.window import Window
import pyspark.sql.functions as F
w = Window.orderBy(F.monotonically_increasing_id())
df_indexed = df.withColumn("row_num", F.row_number().over(w))
df.show ()

+----------+------------+--------------------+-----------+------+--------------------+-------------+--------------------+--------------------+-----------------+
|      asin|helpful_vote|              images|parent_asin|rating|                text|    timestamp|               title|             user_id|verified_purchase|
+----------+------------+--------------------+-----------+------+--------------------+-------------+--------------------+--------------------+-----------------+
|B00CM36GAQ|           0|                  []| B00CM36GAQ|   5.0|Excellent!! Yummy...|1587854482395|  Excellent!  Yummy!|AFKZENTNBQ7A7V7UX...|             true|
|B074J5WVYH|           0|                  []| B0759B7KLH|   5.0|Excellent!  The b...|1587854400380|   Delicious!!! Yum!|AFKZENTNBQ7A7V7UX...|             true|
|B079TRNVHX|           1|                  []| B079TRNVHX|   5.0|These are very ta...|1587853224527|Extremely Delicio...|AFKZENTNBQ7A7V7UX...|             true|
|B07194LN2Z|           0|         

In [4]:
# 3) Compute sizes for three roughly equal parts

total_rows = df_indexed.count()
third = total_rows // 3
third, total_rows

                                                                                

(4772840, 14318520)

In [5]:
# 4) Build three DataFrames (1/3 each)
part1 = df_indexed.filter(F.col("row_num") <= third)
part2 = df_indexed.filter(
    (F.col("row_num") > third) & (F.col("row_num") <= 2 * third)
)
part3 = df_indexed.filter(F.col("row_num") > 2 * third)

In [6]:
# 5) Write each part as CSV (Spark will create folders with CSV files inside)

output_cols = [
    "rating", "title", "text", "asin", "parent_asin",
    "user_id", "timestamp", "helpful_vote", "verified_purchase"
]
part1.select(output_cols).write.csv("../data/grocery_reviews_part1", header=True, mode="overwrite")
part2.select(output_cols).write.csv("../data/grocery_reviews_part2", header=True, mode="overwrite")
part3.select(output_cols).write.csv("../data/grocery_reviews_part3", header=True, mode="overwrite")


25/12/12 17:34:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/12 17:34:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/12 17:34:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/12 17:35:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/12 17:35:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/12 17:36:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/12/12 1

In [7]:
df.printSchema()


root
 |-- asin: string (nullable = true)
 |-- helpful_vote: long (nullable = true)
 |-- images: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- attachment_type: string (nullable = true)
 |    |    |-- large_image_url: string (nullable = true)
 |    |    |-- medium_image_url: string (nullable = true)
 |    |    |-- small_image_url: string (nullable = true)
 |-- parent_asin: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- text: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- title: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- verified_purchase: boolean (nullable = true)



In [8]:
import pyspark.sql.functions as F

string_cols = ["title", "text", "asin", "user_id"]      # string
numeric_cols = ["rating", "timestamp"]                  # numeric

null_counts = df.select(
    # real nulls on numeric cols
    *[F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in numeric_cols],
    # nulls or empty string on string cols
    *[F.count(F.when(F.col(c).isNull() | (F.col(c) == ""), c)).alias(c) for c in string_cols],
)

null_counts.show()




+------+---------+-----+----+----+-------+
|rating|timestamp|title|text|asin|user_id|
+------+---------+-----+----+----+-------+
|     0|        0|    0| 704|   0|      0|
+------+---------+-----+----+----+-------+



                                                                                

In [9]:
df.select("rating", "helpful_vote").describe().show()



+-------+------------------+------------------+
|summary|            rating|      helpful_vote|
+-------+------------------+------------------+
|  count|          14318520|          14318520|
|   mean| 4.122820375290184|0.9733498294516473|
| stddev|1.4419760356357534|14.683534093775904|
|    min|               0.0|                -1|
|    max|               5.0|             27883|
+-------+------------------+------------------+



                                                                                

In [10]:
df.groupBy("verified_purchase").count().show()
df.groupBy("rating").count().orderBy("rating").show()

                                                                                

+-----------------+--------+
|verified_purchase|   count|
+-----------------+--------+
|             true|13176519|
|            false| 1142001|
+-----------------+--------+





+------+-------+
|rating|  count|
+------+-------+
|   0.0|      1|
|   1.0|1855049|
|   2.0| 695428|
|   3.0| 898317|
|   4.0|1256795|
|   5.0|9612930|
+------+-------+



                                                                                