In [16]:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql.types import *

In [17]:
def init():
    spark_init = SparkSession.builder \
        .appName('Assignment') \
        .master('local[*]') \
        .getOrCreate()
    return spark_init
spark_session = init()


In [18]:
RESOURCES_PATH = f'data/'


In [53]:
import glob


In [67]:
product_info_schema = StructType([
    StructField('product_id', StringType(), True),
    StructField('product_name', StringType(), True),
    StructField('brand_id', IntegerType(), True),
    StructField('brand_name', StringType(), True),
    StructField('loves_count', IntegerType(), True),
    StructField('rating', FloatType(), True),
    StructField('reviews', IntegerType(), True),
    StructField('size', StringType(), True),
    StructField('variation_type', StringType(), True),
    StructField('variation_value', StringType(), True),
    StructField('variation_desc', StringType(), True),
    StructField('ingredients', StringType(),   True),
    StructField('price_usd', FloatType(), True),
    StructField('value_price_usd', FloatType(), True),
    StructField('sale_price_usd', FloatType(), True),
    StructField('is_limited_edition', IntegerType(), True),
    StructField('is_new', IntegerType(), True),
    StructField('is_out_of_stock', IntegerType(), True),
    StructField('is_online_only', IntegerType(), True),
    StructField('is_sephora_exclusive', IntegerType(), True),
    StructField('highlights', StringType(), True),
    StructField('primary_category', StringType(), True),
    StructField('secondary_category', StringType(), True),
    StructField('tertiary_category', StringType(), True),
    StructField('child_count', IntegerType(), True),
    StructField('child_max_price', DoubleType(), True),
    StructField('child_min_price', DoubleType(), True),
])

reviews_schema = StructType([
    StructField('review_id', IntegerType(), True),
    StructField('author_id', LongType(), True),
    StructField('rating', IntegerType(), True),
    StructField('is_recommended', FloatType(), True),
    StructField('helpfulness', FloatType(), True),
    StructField('total_feedback_count', IntegerType(), True),
    StructField('total_neg_feedback_count', IntegerType(), True),
    StructField('total_pos_feedback_count', IntegerType(), True),
    StructField('submission_time', StringType(), True),
    StructField('review_text', StringType(), True),
    StructField('review_title', StringType(), True),
    StructField('skin_tone', StringType(),   True),
    StructField('eye_color', StringType(), True),
    StructField('skin_type', StringType(), True),
    StructField('hair_color', StringType(), True),
    StructField('product_id', StringType(), True),
    StructField('product_name', StringType(), True),
    StructField('brand_name', StringType(), True),
    StructField('price_usd', FloatType(), True),
])

product_info_df = spark_session.read.csv(f'../{RESOURCES_PATH}/product_info.csv',
                                            header=True,
                                            schema=product_info_schema)

file_pattern = f"../{RESOURCES_PATH}/reviews*.csv"
file_paths = glob.glob(file_pattern)

# Step 4: Read each CSV file into a DataFrame using the predefined schema
dataframes = []
for file_path in file_paths:
    df = spark_session.read.csv(file_path, header=True, schema=reviews_schema)
    dataframes.append(df)

# Step 5: Union the DataFrames to create a single combined DataFrame
if dataframes:
    reviews_df_merge = dataframes[0]
    for df in dataframes[1:]:
        reviews_df_merge = reviews_df_merge.union(df)

In [78]:

product_info_df.show(100)
# reviews_df_merge.show()
# reviews_df_merge.drop('submission_time')

+----------+--------------------+--------+--------------+-----------+------+-------+--------------------+--------------------+--------------------+--------------+--------------------+---------+---------------+--------------+------------------+------+---------------+--------------+--------------------+--------------------+----------------+--------------------+--------------------+-----------+---------------+---------------+
|product_id|        product_name|brand_id|    brand_name|loves_count|rating|reviews|                size|      variation_type|     variation_value|variation_desc|         ingredients|price_usd|value_price_usd|sale_price_usd|is_limited_edition|is_new|is_out_of_stock|is_online_only|is_sephora_exclusive|          highlights|primary_category|  secondary_category|   tertiary_category|child_count|child_max_price|child_min_price|
+----------+--------------------+--------+--------------+-----------+------+-------+--------------------+--------------------+--------------------

In [79]:
reviews_df_merge.show()


+---------+-----------+------+--------------+-----------+--------------------+------------------------+------------------------+---------------+--------------------+--------------------+-----------+---------+-----------+----------+----------+--------------------+----------+---------+
|review_id|  author_id|rating|is_recommended|helpfulness|total_feedback_count|total_neg_feedback_count|total_pos_feedback_count|submission_time|         review_text|        review_title|  skin_tone|eye_color|  skin_type|hair_color|product_id|        product_name|brand_name|price_usd|
+---------+-----------+------+--------------+-----------+--------------------+------------------------+------------------------+---------------+--------------------+--------------------+-----------+---------+-----------+----------+----------+--------------------+----------+---------+
|        0| 1741593524|     5|           1.0|        1.0|                   2|                       0|                       2|     2023-02-01|I

In [80]:
def null_percentage(data: DataFrame) -> (DataFrame):
    total_count = data.count()
    null_counts = data.select([(count(when(col(c).isNull() | isnan(c), c)) / total_count).alias(c) for c in data.columns])
    return null_counts

# Calculate and show null percentages
null_percentage_df_product = null_percentage(product_info_df)
null_percentage_df_product.show()

+----------+------------+--------------------+----------+--------------------+--------------------+--------------------+-------------------+-------------------+-------------------+------------------+-------------------+--------------------+------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+-----------------+--------------------+------------------+------------------+
|product_id|product_name|            brand_id|brand_name|         loves_count|              rating|             reviews|               size|     variation_type|    variation_value|    variation_desc|        ingredients|           price_usd|   value_price_usd|    sale_price_usd|  is_limited_edition|              is_new|     is_out_of_stock|      is_online_only|is_sephora_exclusive|        highlights|    primary_category|  secondary_category|tertiary_category|         child_c

In [81]:
null_percentage_df_reviews = null_percentage(reviews_df_merge)
null_percentage_df_reviews.show()

+--------------------+--------------------+--------------------+-------------------+------------------+--------------------+------------------------+------------------------+------------------+--------------------+------------------+------------------+------------------+-------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
|           review_id|           author_id|              rating|     is_recommended|       helpfulness|total_feedback_count|total_neg_feedback_count|total_pos_feedback_count|   submission_time|         review_text|      review_title|         skin_tone|         eye_color|          skin_type|         hair_color|          product_id|        product_name|          brand_name|           price_usd|
+--------------------+--------------------+--------------------+-------------------+------------------+--------------------+------------------------+------------------------+------------------+---------------

## Cleaning the data
- from null percentage of product_info, that is have null value percentage more than 85 %, 
variation_desc, value_price_usd, and sale_price_usd. we will drop that column
- reviews doesnt have null percentage that more than 80%, so we doesnt need to drop it

Actually there is no specific rule how much percentage that null value can be accepted, someone said 60 or 80 %, 
I still doesnt find any resources from paper or article for this statement, so data scientist more understand that for their 
machine learning task, and I am not data scientist

In [74]:
def clean_transform(product_info: DataFrame, reviews: DataFrame) -> (DataFrame, DataFrame):
    """
    Cleans and transforms the product_info and reviews
    :param reviews: 
    :param product_info:
    :return: tuple
        - fd_clean_df
    """
    # Clean and Transform product_info
    columns_to_drop = ['variation_desc', 'value_price_usd', 'sale_price_usd']
    columns_to_cast = ['is_limited_edition', 'is_new', 'is_out_of_stock', 'is_online_only', 'is_sephora_exclusive']

    # Cast columns to BooleanType
    for column in columns_to_cast:
        product_info = product_info.withColumn(column, col(column).cast(BooleanType()))
    # Drop columns
    clean_product_info = product_info.drop(*columns_to_drop)
    
    clean_reviews_df = reviews.withColumn('is_recommended', col('is_recommended').cast(BooleanType()))
    


    return clean_product_info, clean_reviews_df

In [75]:
clean_product_info_df, clean_reviews_df = clean_transform(product_info_df, reviews_df_merge)

In [76]:
clean_reviews_df.show()

+---------+-----------+------+--------------+-----------+--------------------+------------------------+------------------------+---------------+--------------------+--------------------+-----------+---------+-----------+----------+----------+--------------------+----------+---------+
|review_id|  author_id|rating|is_recommended|helpfulness|total_feedback_count|total_neg_feedback_count|total_pos_feedback_count|submission_time|         review_text|        review_title|  skin_tone|eye_color|  skin_type|hair_color|product_id|        product_name|brand_name|price_usd|
+---------+-----------+------+--------------+-----------+--------------------+------------------------+------------------------+---------------+--------------------+--------------------+-----------+---------+-----------+----------+----------+--------------------+----------+---------+
|        0| 1741593524|     5|          true|        1.0|                   2|                       0|                       2|     2023-02-01|I

In [82]:
clean_product_info_df.show()

+----------+--------------------+--------+----------+-----------+------+-------+---------------+--------------------+--------------------+--------------------+---------+------------------+------+---------------+--------------+--------------------+--------------------+----------------+------------------+--------------------+-----------+---------------+---------------+
|product_id|        product_name|brand_id|brand_name|loves_count|rating|reviews|           size|      variation_type|     variation_value|         ingredients|price_usd|is_limited_edition|is_new|is_out_of_stock|is_online_only|is_sephora_exclusive|          highlights|primary_category|secondary_category|   tertiary_category|child_count|child_max_price|child_min_price|
+----------+--------------------+--------+----------+-----------+------+-------+---------------+--------------------+--------------------+--------------------+---------+------------------+------+---------------+--------------+--------------------+-------------