In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, lower, split
from pyspark.ml.feature import StopWordsRemover

# Initialize Spark session
spark = SparkSession.builder \
    .appName("DataCleaning") \
    .getOrCreate()

# Load the dataset
data = spark.read.json("hdfs:///user/bda/data")
print("Data Loaded Successfully!")
data.show(5)  # Inspect the first few rows


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/17 19:25:04 INFO SparkEnv: Registering MapOutputTracker
25/01/17 19:25:04 INFO SparkEnv: Registering BlockManagerMaster
25/01/17 19:25:04 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
25/01/17 19:25:04 INFO SparkEnv: Registering OutputCommitCoordinator
                                                                                

Data Loaded Successfully!
+----------+------------+--------------------+-----------+------+--------------------+-------------+--------------------+--------------------+-----------------+
|      asin|helpful_vote|              images|parent_asin|rating|                text|    timestamp|               title|             user_id|verified_purchase|
+----------+------------+--------------------+-----------+------+--------------------+-------------+--------------------+--------------------+-----------------+
|B00Z03RC80|           0|[{IMAGE, https://...| B00Z03RC80|   1.0|Opened the packag...|1616743454733|Gasoline!! Seriou...|AFKZENTNBQ7A7V7UX...|             true|
|B085PRT2MP|           0|[{IMAGE, https://...| B085PRT2MP|   1.0|Tops the list for...|1614915977684|Useless!  These h...|AFKZENTNBQ7A7V7UX...|             true|
|B08G81QQ9L|           0|                  []| B08G81QQ9L|   5.0|Bought this for m...|1612052493701|Hailey loves unic...|AFKZENTNBQ7A7V7UX...|             true|
|B07YYG7

In [2]:
print("Removing null or missing values...")
cleaned_data = data.na.drop(subset=["rating", "text", "user_id"])
print(f"Rows after removing null values: {cleaned_data.count()}")


Removing null or missing values...




Rows after removing null values: 23911390


                                                                                

In [3]:
print("Removing punctuation and special characters...")
cleaned_data = cleaned_data.withColumn(
    "cleaned_text",
    regexp_replace(lower(col("text")), r"[^\w\s]", "")  # Remove special characters
)


Removing punctuation and special characters...


In [4]:
cleaned_data.select("cleaned_text").show(5, truncate=False)  # Inspect cleaned text


+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [5]:
print("Removing stopwords...")
tokenized_data = cleaned_data.withColumn("words", split(col("cleaned_text"), " "))

# Use StopWordsRemover to filter stopwords
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
final_data = stopwords_remover.transform(tokenized_data)

print("Stopwords removed.")
final_data.select("filtered_words").show(5, truncate=False)  # Inspect filtered text


Removing stopwords...
Stopwords removed.


[Stage 6:>                                                          (0 + 1) / 1]

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|filtered_words                                                                                                                                                               

                                                                                

In [6]:
final_data.select("words", "filtered_words").show(truncate=False)


[Stage 7:>                                                          (0 + 1) / 1]

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [7]:
# first 5 row
final_data.show(5, truncate=False)



[Stage 8:>                                                          (0 + 1) / 1]

+----------+------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [8]:
final_data.columns


['asin',
 'helpful_vote',
 'images',
 'parent_asin',
 'rating',
 'text',
 'timestamp',
 'title',
 'user_id',
 'verified_purchase',
 'cleaned_text',
 'words',
 'filtered_words']

In [9]:
# Basic statistics for numerical columns
final_data.select("rating", "helpful_vote").describe().show()



+-------+------------------+------------------+
|summary|            rating|      helpful_vote|
+-------+------------------+------------------+
|  count|          23911390|          23911390|
|   mean| 4.111858783617347|1.1507384137852295|
| stddev|1.4101429845296285| 13.16797033892482|
|    min|               1.0|                 0|
|    max|               5.0|             11931|
+-------+------------------+------------------+



                                                                                

In [10]:
# Distribution of ratings
final_data.groupBy("rating").count().orderBy("rating").show()

[Stage 14:>                                                         (0 + 1) / 1]

+------+--------+
|rating|   count|
+------+--------+
|   1.0| 2840878|
|   2.0| 1275031|
|   3.0| 1715821|
|   4.0| 2616444|
|   5.0|15463216|
+------+--------+



                                                                                

In [11]:
# Top frequent words
from pyspark.sql.functions import explode, col
word_frequency = final_data.withColumn("word", explode(col("filtered_words"))) \
                           .groupBy("word").count() \
                           .orderBy(col("count").desc())
word_frequency.show(10)



+-------+--------+
|   word|   count|
+-------+--------+
|       |13455433|
|   hair| 6582850|
|product| 5446835|
|  great| 4949366|
|   love| 4563188|
|   like| 4424369|
|    use| 4366756|
|   good| 3437961|
|   skin| 3048983|
|    one| 2944008|
+-------+--------+
only showing top 10 rows



                                                                                

In [12]:
# first 5 row
final_data.show(5, truncate=False)

+----------+------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [15]:
# Save the cleaned data back to HDFS in Parquet format
final_data.write.parquet("hdfs:///user/bda/processed_data/")



                                                                                

In [18]:
spark.stop()