# Data Cleaning and Preprocessing



In this notebook, we'll focus on cleaning and preprocessing our dataset for further analysis.



In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Cell_Phones_Analysis_Cleaning") \
    .getOrCreate()


23/10/30 17:03:32 WARN Utils: Your hostname, MacBook-Air-de-Ivan.local resolves to a loopback address: 127.0.0.1; using 192.168.0.10 instead (on interface en0)
23/10/30 17:03:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/30 17:03:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/10/30 17:03:34 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### Setup and Data Loading

Import Necessary Libraries

In [2]:
from pyspark.sql import functions as F

path_to_json = "../data_amazon/Cell_Phones_and_Accessories.json"
df = spark.read.json(path_to_json)


23/10/30 17:04:01 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


### Preliminary Inspection

Check for Null Values

In [3]:
null_counts = df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns])
null_counts.show()




+----+-------+-------+----------+----------+----------+------------+-------+-------+--------------+--------+-------+
|asin|  image|overall|reviewText|reviewTime|reviewerID|reviewerName|  style|summary|unixReviewTime|verified|   vote|
+----+-------+-------+----------+----------+----------+------------+-------+-------+--------------+--------+-------+
|   0|9880950|      0|      9373|         0|         0|         727|5046121|   5519|             0|       0|9373510|
+----+-------+-------+----------+----------+----------+------------+-------+-------+--------------+--------+-------+



                                                                                

### Null Values Analysis

From the table above, we can observe the number of null or missing values for each column in the dataset:

- **asin**: There are no missing values.
- **image**: 9,880,950 entries are null.
- **overall**: No missing values.
- **reviewText**: 9,373 entries are null.
- **reviewTime**: No missing values.
- **reviewerID**: No missing values.
- **reviewerName**: 727 entries are null.
- **style**: 5,046,121 entries are null.
- **summary**: 5,519 entries are null.
- **unixReviewTime**: No missing values.
- **verified**: No missing values.
- **vote**: 9,373,510 entries are null.

This gives us an insight into which columns have significant missing data. Especially, the `image`, `style`, and `vote` columns have a high number of null entries. 


## Handling Missing Data

We'll address columns with missing values and decide whether to fill them or drop them based on the context.


### Dropping the `image` column
Given that image analysis is not part of our study, we'll proceed to drop the `image` column from our dataset.


In [4]:
# Dropping the 'image' column from the DataFrame
df = df.drop("image")


The `image` column has been successfully dropped from the dataset. We can verify this by inspecting the updated schema of our DataFrame.


In [5]:
df.printSchema()

root
 |-- asin: string (nullable = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- style: struct (nullable = true)
 |    |-- Color Name:: string (nullable = true)
 |    |-- Color:: string (nullable = true)
 |    |-- Design:: string (nullable = true)
 |    |-- Edition:: string (nullable = true)
 |    |-- Flavor Name:: string (nullable = true)
 |    |-- Flavor:: string (nullable = true)
 |    |-- Format:: string (nullable = true)
 |    |-- Hand Orientation:: string (nullable = true)
 |    |-- Item Display Length:: string (nullable = true)
 |    |-- Item Package Quantity:: string (nullable = true)
 |    |-- Length:: string (nullable = true)
 |    |-- Material Type:: string (nullable = true)
 |    |-- Material:: string (nullable = true)
 |    |-- Metal Type:: string (nullable = true)
 |    |-- Model:: string (nullable = 

### Imputing Missing Values in `reviewText` Column
For the `reviewText` column, we observed that there are some missing entries. To address this, we will impute these missing values with a placeholder text "No review text provided".


In [6]:
# Imputing missing values in the 'reviewText' column
df = df.na.fill({"reviewText": "No review text provided"})



The missing values in the `reviewText` column have been successfully imputed. We can verify this by inspecting a few rows where the `reviewText` was previously missing.


In [7]:
# Displaying rows where 'reviewText' is "No review text provided"
df.filter(df["reviewText"] == "No review text provided").show()


[Stage 4:>                                                          (0 + 1) / 1]

+----------+-------+--------------------+-----------+--------------+-----------------+--------------------+--------------------+--------------+--------+----+
|      asin|overall|          reviewText| reviewTime|    reviewerID|     reviewerName|               style|             summary|unixReviewTime|verified|vote|
+----------+-------+--------------------+-----------+--------------+-----------------+--------------------+--------------------+--------------+--------+----+
|B0002SYC5O|    5.0|No review text pr...|10 18, 2015| AYDRLIY68ZXVI|  Amazon Customer|{null,  Black, nu...|          Five Stars|    1445126400|    true|null|
|B0007LA92G|    5.0|No review text pr...| 04 1, 2015|A21YM2AVDUT42X|    Tracey Barton|                null|          Five Stars|    1427846400|    true|null|
|B000A7ZWQK|    5.0|No review text pr...|06 28, 2016|A3OITH71ER9Q34|   Kevin Corcoran|{null,  193, null...|          Five Stars|    1467072000|    true|null|
|B000IXNEI4|    5.0|No review text pr...| 04 8, 2015

                                                                                

### Imputing Missing Values in `reviewerName` Column
There are 727 missing entries in the `reviewerName` column. Instead of leaving them as null, it might be beneficial to replace them with a generic placeholder like "Anonymous" to indicate that the reviewer's name is not provided.


In [8]:
# Imputing missing values in the 'reviewerName' column
df = df.na.fill({"reviewerName": "Anonymous"})


The missing values in the `reviewerName` column have been successfully imputed with the "Anonymous" placeholder. We can further validate this by checking a few rows where the `reviewerName` was previously missing.


In [9]:
# Displaying rows where 'reviewerName' is "Anonymous"
df.filter(df["reviewerName"] == "Anonymous").show()


+----------+-------+--------------------+-----------+--------------+------------+--------------------+--------------------+--------------+--------+----+
|      asin|overall|          reviewText| reviewTime|    reviewerID|reviewerName|               style|             summary|unixReviewTime|verified|vote|
+----------+-------+--------------------+-----------+--------------+------------+--------------------+--------------------+--------------+--------+----+
|9707716371|    3.0|Had this battery ...| 10 8, 2017| APOMPL5SOEWAJ|   Anonymous|                null|      Good not great|    1507420800|    true|null|
|9713248031|    1.0|Received this ite...|03 30, 2017|A1KGPKYBJNSTIQ|   Anonymous|{null,  White, nu...|not a samsung cha...|    1490832000|    true|null|
|9791151504|    5.0|Exactly what I ne...|07 27, 2016|A2GRSR5W5V5QVT|   Anonymous|                null|          Five Stars|    1469577600|    true|null|
|B0002W2H2K|    3.0|I have owned this...|08 21, 2005|A16D87D6594DF5|   Anonymous| 

### Dropping the `style` Column
Given the substantial amount of missing data in the `style` column (5,046,121 entries) and its potential non-critical nature for our current analysis, we'll opt to drop this column from the dataset.


In [10]:
# Dropping the 'style' column
df = df.drop("style")


The `style` column has been successfully removed from the dataset. We can validate this by examining the updated schema of the dataframe.


In [11]:
# Displaying the updated schema of the dataframe
df.printSchema()


root
 |-- asin: string (nullable = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = false)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = false)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)
 |-- verified: boolean (nullable = true)
 |-- vote: string (nullable = true)



### Imputing Missing Summaries
Given that there are 5,519 reviews without a summary, we'll replace these missing entries with a placeholder text "No summary provided" to ensure consistency within our dataset.


In [12]:
# Replacing missing summaries with the placeholder text
df = df.na.fill({"summary": "No summary provided"})


Missing summaries have been successfully replaced with the placeholder "No summary provided". We can confirm this by querying records that previously had missing summaries.


In [13]:
# Displaying some records with the placeholder summary
df.filter(df["summary"] == "No summary provided").show(truncate=False)


[Stage 6:>                                                          (0 + 1) / 1]

+----------+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+--------------+---------------------+-------------------+--------------+--------+----+
|asin      |overall|reviewText                                                                                                                                                                                                                                                                               

                                                                                

### Dropping the 'vote' Column
Given the substantial number of missing entries in the `vote` column (9,373,510 entries to be exact), and considering the fact that most reviews did not receive any votes, it might be optimal to drop this column.


In [14]:
# Dropping the 'vote' column from the DataFrame
df = df.drop("vote")


The 'vote' column has been successfully dropped from the dataset. We can confirm this by inspecting the DataFrame's columns.


In [15]:
# Displaying the DataFrame's columns
df.columns


['asin',
 'overall',
 'reviewText',
 'reviewTime',
 'reviewerID',
 'reviewerName',
 'summary',
 'unixReviewTime',
 'verified']

## Data Transformation and Feature Engineering

Next, we'll engineer some features that might be useful for our analysis and models.


Text Length Features

Given that the length of reviews or their summaries might provide insights into the depth or detail of the review, we will create new features to represent the length of the reviewText and summary for each entry. By doing so, we might be able to discern patterns or correlations between review length and other factors like sentiment, product rating, or verification status.

In [16]:
df = df.withColumn("reviewText_length", F.length(df["reviewText"]))
df = df.withColumn("summary_length", F.length(df["summary"]))

Results of Text Length Feature Creation

Two new columns, reviewText_length and summary_length, have been added to the DataFrame. These columns represent the character length of the reviewText and summary respectively. With these new features, we can now analyze if the length of reviews or summaries has any bearing on the overall sentiment, the product's average rating, or any other factor we deem relevant.

In [17]:
# Displaying the first few rows, focusing on new columns
df.select("reviewText", "reviewText_length", "summary", "summary_length").show(truncate=False)


+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+---------------------------------------------------------------------------------------------+--------------+
|reviewText                                                                                                                                                                                            

## Temporal Features

The `unixReviewTime` column represents the timestamp of the review. We can derive multiple features from this single timestamp. One immediate conversion is to represent this timestamp in a more human-readable format.


In [18]:
from pyspark.sql.functions import from_unixtime

# Convert unixReviewTime to a readable date format
df = df.withColumn("reviewDate", from_unixtime("unixReviewTime").cast("date"))

# Display the first few rows to verify
df.select("unixReviewTime", "reviewDate").show()


+--------------+----------+
|unixReviewTime|reviewDate|
+--------------+----------+
|    1416355200|2014-11-18|
|    1416355200|2014-11-18|
|    1416355200|2014-11-18|
|    1416355200|2014-11-18|
|    1416355200|2014-11-18|
|    1416355200|2014-11-18|
|    1416355200|2014-11-18|
|    1416355200|2014-11-18|
|    1424390400|2015-02-19|
|    1423785600|2015-02-12|
|    1423440000|2015-02-08|
|    1423267200|2015-02-06|
|    1422835200|2015-02-01|
|    1422748800|2015-01-31|
|    1422316800|2015-01-26|
|    1422316800|2015-01-26|
|    1421971200|2015-01-22|
|    1421884800|2015-01-21|
|    1421884800|2015-01-21|
|    1421712000|2015-01-19|
+--------------+----------+
only showing top 20 rows



The above code transforms the `unixReviewTime` into a more readable date format stored in the `reviewDate` column. This new column will be helpful in understanding the distribution of reviews over time and can be used to create further temporal features if needed.


## Day of the Week Feature

Understanding the day of the week when a review was written can be insightful. For instance, there might be patterns indicating that users are more likely to write reviews on weekends or certain weekdays. Let's extract the day of the week from our `reviewDate` column.


In [19]:
from pyspark.sql.functions import dayofweek, date_format

# Extracting day of the week
df = df.withColumn("day_of_week", date_format("reviewDate", "EEEE"))

# Displaying the first few rows to verify
df.select("reviewDate", "day_of_week").show()


+----------+-----------+
|reviewDate|day_of_week|
+----------+-----------+
|2014-11-18|    Tuesday|
|2014-11-18|    Tuesday|
|2014-11-18|    Tuesday|
|2014-11-18|    Tuesday|
|2014-11-18|    Tuesday|
|2014-11-18|    Tuesday|
|2014-11-18|    Tuesday|
|2014-11-18|    Tuesday|
|2015-02-19|   Thursday|
|2015-02-12|   Thursday|
|2015-02-08|     Sunday|
|2015-02-06|     Friday|
|2015-02-01|     Sunday|
|2015-01-31|   Saturday|
|2015-01-26|     Monday|
|2015-01-26|     Monday|
|2015-01-22|   Thursday|
|2015-01-21|  Wednesday|
|2015-01-21|  Wednesday|
|2015-01-19|     Monday|
+----------+-----------+
only showing top 20 rows



The `day_of_week` column now represents the day of the week derived from the `reviewDate`. This can be used for aggregations to understand the distribution of reviews across different days of the week.


## Sentiment Analysis Feature

The sentiment of a review can provide valuable insights into user satisfaction. Using the "overall" score, we'll classify each review's sentiment as:
- **Positive**: for scores greater than 3
- **Neutral**: for a score of 3
- **Negative**: for scores less than 3


In [20]:
from pyspark.sql.functions import when

# Classifying sentiment based on 'overall' score
df = df.withColumn("sentiment", 
                   when(df["overall"] > 3, "Positive")
                   .when(df["overall"] == 3, "Neutral")
                   .otherwise("Negative"))

# Displaying the first few rows to verify
df.select("overall", "sentiment").show()


+-------+---------+
|overall|sentiment|
+-------+---------+
|    5.0| Positive|
|    5.0| Positive|
|    5.0| Positive|
|    5.0| Positive|
|    5.0| Positive|
|    5.0| Positive|
|    5.0| Positive|
|    5.0| Positive|
|    5.0| Positive|
|    5.0| Positive|
|    5.0| Positive|
|    5.0| Positive|
|    5.0| Positive|
|    5.0| Positive|
|    5.0| Positive|
|    5.0| Positive|
|    5.0| Positive|
|    5.0| Positive|
|    5.0| Positive|
|    5.0| Positive|
+-------+---------+
only showing top 20 rows



The `sentiment` column provides a classification of each review based on its "overall" score. This categorization can be used for various analyses, such as understanding the distribution of sentiments across the dataset or analyzing trends over time.


## Aggregate Features

Aggregating data at the product level can help identify products that are popular, highly rated, or have potential issues. In this section, we'll compute:
- **Average Rating**: The mean of the "overall" score for each product.
- **Review Count**: The total number of reviews each product has received.


In [21]:
from pyspark.sql import functions as F

# Group by 'asin' and compute aggregate features
product_aggregates = (df.groupBy("asin")
                      .agg(F.avg("overall").alias("avg_rating"), 
                           F.count("*").alias("review_count")))

# Joining the aggregates back to the main dataframe
df = df.join(product_aggregates, on="asin", how="left")

# Displaying the first few rows to verify
df.select("asin", "overall", "avg_rating", "review_count").show()




+----------+-------+------------------+------------+
|      asin|overall|        avg_rating|review_count|
+----------+-------+------------------+------------+
|B0046C9G2G|    3.0|               3.0|           1|
|B009OUAVOC|    1.0|               1.0|           1|
|B00HDQD40M|    5.0| 4.388888888888889|         126|
|B00HDQD40M|    3.0| 4.388888888888889|         126|
|B00HDQD40M|    3.0| 4.388888888888889|         126|
|B00HDQD40M|    5.0| 4.388888888888889|         126|
|B00HDQD40M|    4.0| 4.388888888888889|         126|
|B00HDQD40M|    5.0| 4.388888888888889|         126|
|B00HDQD40M|    5.0| 4.388888888888889|         126|
|B00HDQD40M|    5.0| 4.388888888888889|         126|
|B00HDQD40M|    5.0| 4.388888888888889|         126|
|B00HDQD40M|    5.0| 4.388888888888889|         126|
|B00HDQD40M|    2.0| 4.388888888888889|         126|
|B00HDQD40M|    4.0| 4.388888888888889|         126|
|B00HDQD40M|    5.0| 4.388888888888889|         126|
|B00HDRBELC|    3.0|1.9444444444444444|       

                                                                                

The new columns `avg_rating` and `review_count` provide aggregated data for each product. This information can be useful to gauge the overall sentiment towards a product and its popularity among reviewers.


## Boolean Features

Often, categorical features with two categories (e.g., Yes/No or True/False) can be represented as binary features for simplicity and ease of analysis. For this dataset, the "verified" column indicates whether a review is verified or not. We'll convert this into a binary feature.


In [22]:
# Convert 'verified' column to binary feature (1 for True, 0 for False)
df = df.withColumn("is_verified", F.when(df["verified"] == True, 1).otherwise(0))

# Displaying the first few rows to verify
df.select("verified", "is_verified").show()


+--------+-----------+
|verified|is_verified|
+--------+-----------+
|   false|          0|
|   false|          0|
|   false|          0|
|   false|          0|
|   false|          0|
|   false|          0|
|   false|          0|
|   false|          0|
|    true|          1|
|    true|          1|
|    true|          1|
|    true|          1|
|    true|          1|
|    true|          1|
|    true|          1|
|   false|          0|
|    true|          1|
|    true|          1|
|    true|          1|
|    true|          1|
+--------+-----------+
only showing top 20 rows



The new `is_verified` column represents whether a review is verified in a binary format. This transformation simplifies the representation and can be directly used in analytical models.


## Saving Transformed Data

After all the data transformations and feature engineering, it's crucial to save the processed data. This will facilitate efficient retrieval in the subsequent steps of our analysis and modeling without the need to re-run all preprocessing steps.


In [23]:
# Define the path to save the processed data
save_path = "/Users/ivanozono/Proyects/big-data-cellphones-analysis/data_amazon/processed"

# Save the DataFrame in Parquet format (recommended for Spark)
df.write.parquet(save_path, mode="overwrite")


23/10/30 18:20:00 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/10/30 18:20:00 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/10/30 18:20:07 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/10/30 18:20:10 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/10/30 18:20:10 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/10/30 18:20:15 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/10/30 18:20:17 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014

The transformed data has been successfully saved to the "processed" directory in Parquet format. This format is columnar and optimized for big data processing tasks with Spark. Now, this processed dataset can be directly loaded for further analysis and modeling in the subsequent stages of our project.


## Concluding Remarks

Throughout this notebook, we embarked on a crucial journey in the realm of data processing and feature engineering. Starting with a raw dataset, we:

1. **Addressed Missing Values**: By making informed decisions based on the data's context, we ensured our dataset is more robust and less prone to biases that might arise from missing information.
2. **Engineered Features**: These new features, derived from existing columns, enhance the dataset's richness and provide more dimensions for analysis.
3. **Transformed Data**: By converting, categorizing, and aggregating, we molded the data into a more digestible and analyzable format.
4. **Stored Processed Data**: By saving our progress, we've created a checkpoint. This allows for efficient retrieval in subsequent analyses, ensuring reproducibility and saving time.

The steps taken here lay the foundation for deeper analysis, visualization, and modeling. With a cleaned and enriched dataset, we are better equipped to extract meaningful insights and build predictive models in the subsequent phases of our project.

Up next, in our project journey, we might explore data visualization to understand patterns, trends, and anomalies, or delve straight into building predictive models. 
