In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Recommended_System").getOrCreate()

In [2]:
df = spark.read.csv("eCommerce-dataset-samples\walmart-products.csv", header=True, inferSchema=True, sep=",")

In [3]:

df = spark.read.option("header", True) \
               .option("inferSchema", True) \
               .option("multiLine", True) \
               .option("escape", '"') \
               .csv("eCommerce-dataset-samples\walmart-products.csv")  # Đổi thành đường dẫn file của bạn

df.show()

+-------------------+--------------------+-----------+----------+--------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+--------------------+------------------+--------------------+--------------------+------------+---------------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+------------+--------------------+--------------------+------+----------+-----+-----+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+--------+--------------------+--------------------+
|          timestamp|                 url|final_price|       sku|currency|        gtin|      specifications|          image_urls|         top_reviews|        rating_stars|       related_pages|available_for_delivery|

In [9]:
df = spark.read.format("com.databricks.spark.csv") \
    .option("header", "true") \
    .option("quote", '"') \
    .option("escape", '"') \
    .option("multiLine", True) \
    .load("eCommerce-dataset-samples\walmart-products.csv")
df.show()

+--------------------+--------------------+--------------------+----------+--------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+--------------------+------------------+--------------------+--------------------+------------+---------------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+------------+--------------------+--------------------+------+----------+-----+-----+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+--------+--------------------+--------------------+
|           timestamp|                 url|         final_price|       sku|currency|        gtin|      specifications|          image_urls|         top_reviews|        rating_stars|       related_pages|ava

In [10]:
df.select("specifications").show(5)

+--------------------+
|      specifications|
+--------------------+
|[{"name":"Brand",...|
|[{"name":"Brand",...|
|[{"name":"Feature...|
|[{"name":"Brand",...|
|[{"name":"Country...|
+--------------------+
only showing top 5 rows



In [11]:
df.select("specifications").show(5)

+--------------------+
|      specifications|
+--------------------+
|[{"name":"Brand",...|
|[{"name":"Brand",...|
|[{"name":"Feature...|
|[{"name":"Brand",...|
|[{"name":"Country...|
+--------------------+
only showing top 5 rows



In [None]:
df.select("top_reviews").collect()

In [None]:
df.select("rating_stars").collect()

In [53]:
set(df.select("categories").collect())

{Row(categories='["Arts Crafts & Sewing","Fabric","Shop All Fabric"]'),
 Row(categories='["Arts Crafts & Sewing","Scrapbooking","Stickers"]'),
 Row(categories='["Auto & Tires","Interior Parts & Accessories","Car Seat Cushions","Car Seat Cushions by Material","Air Car Seat Cushions"]'),
 Row(categories='["Baby","Diapering","Diapers","Size 4 Diapers"]'),
 Row(categories='["Baby","Feeding","Baby Food","Baby Snack Foods"]'),
 Row(categories='["Baby","Health & Safety","Baby Gates","Retractable baby gates"]'),
 Row(categories='["Baby","Health & Safety","Baby Medicine Cabinet","Baby Itching & Rash Treatments"]'),
 Row(categories='["Baby","Toddlers\' Room","Toddler Bedding","Toddler Bedding Sets & Sheets"]'),
 Row(categories='["Beauty","Beauty by Top Brands","Aveeno","Aveeno Hair Care"]'),
 Row(categories='["Beauty","Beauty by Top Brands","Clean & Clear"]'),
 Row(categories='["Beauty","Beauty by Top Brands","Cocokind"]'),
 Row(categories='["Beauty","Beauty by Top Brands","Maybelline New York",

In [46]:
df.describe().show()

+-------+--------------------+-----------------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+------------------+----+-----+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------+--------------------+--------------------+
|summary|                 url|      final_price|                 sku|currency|                gtin|      specifications|          image_urls|         top_reviews|        rating_stars|       related_pages|  

In [61]:
df.select('rating_stars').show()

+--------------------+
|        rating_stars|
+--------------------+
|{"five_stars":2,"...|
|{"five_stars":47,...|
|    {"five_stars":2}|
|{"five_stars":78,...|
|{"five_stars":12,...|
|{"five_stars":63,...|
|{"five_stars":8,"...|
|{"five_stars":228...|
|{"five_stars":164...|
|{"five_stars":58,...|
|{"five_stars":534...|
|{"five_stars":42,...|
|{"five_stars":177...|
|    {"five_stars":8}|
|{"five_stars":7,"...|
|    {"five_stars":9}|
|{"five_stars":1,"...|
|{"five_stars":50,...|
|{"five_stars":424...|
|{"five_stars":71,...|
+--------------------+
only showing top 20 rows



In [66]:
from pyspark.sql.functions import col, expr, from_json
from pyspark.sql.types import MapType, StringType, IntegerType# Định nghĩa kiểu dữ liệu cho JSON
schema = MapType(StringType(), IntegerType())

# Chuyển đổi chuỗi JSON thành MapType
df = df.withColumn("rating_map", from_json(col("rating_stars"), schema))

# Tính tổng điểm và tổng số đánh giá
df = df.withColumn("total_points",
                   expr("COALESCE(rating_map['five_stars'],0) * 5 + "
                        "COALESCE(rating_map['four_stars'],0) * 4 + "
                        "COALESCE(rating_map['three_stars'],0) * 3 + "
                        "COALESCE(rating_map['two_stars'],0) * 2 + "
                        "COALESCE(rating_map['one_star'],0) * 1"))

df = df.withColumn("total_reviews",
                   expr("COALESCE(rating_map['five_stars'],0) + "
                        "COALESCE(rating_map['four_stars'],0) + "
                        "COALESCE(rating_map['three_stars'],0) + "
                        "COALESCE(rating_map['two_stars'],0) + "
                        "COALESCE(rating_map['one_star'],0)"))

# Tính trung bình rating
df = df.withColumn("average_rating", col("total_points") / col("total_reviews"))

# Hiển thị kết quả
df.select("rating_stars", "average_rating").show(truncate=False)

+-------------------------------------------------------------------------------------+-----------------+
|rating_stars                                                                         |average_rating   |
+-------------------------------------------------------------------------------------+-----------------+
|{"five_stars":2,"four_stars":3,"two_stars":1}                                        |4.0              |
|{"five_stars":47,"four_stars":4,"one_star":4,"three_stars":3}                        |4.551724137931035|
|{"five_stars":2}                                                                     |5.0              |
|{"five_stars":78,"four_stars":17,"one_star":2,"three_stars":3,"two_stars":1}         |4.663366336633663|
|{"five_stars":12,"one_star":1,"three_stars":1,"two_stars":1}                         |4.4              |
|{"five_stars":63,"four_stars":21,"one_star":5,"three_stars":5,"two_stars":1}         |4.431578947368421|
|{"five_stars":8,"four_stars":2,"one_star":1,"

In [72]:
from pyspark.sql.functions import col, regexp_replace, floor

# Loại bỏ dấu $ và chuyển về kiểu số
df = df.withColumn("discount", regexp_replace(col("discount"), "[$]", "").cast("double"))
df = df.withColumn("initial_price", regexp_replace(col("initial_price"), "[$]", "").cast("double"))

# Tính phần trăm giảm giá và làm tròn xuống số nguyên
df = df.withColumn("percent_discount", floor((col("discount") / col("initial_price")) * 100))

In [73]:
df.show()

+-------------------+--------------------+-----------+----------+--------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+--------------------+------------------+--------------------+--------------------+------------+---------------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+------------+--------------------+--------------------+------+----------+-----+-----+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+--------+--------------------+--------------------+--------------------+------------+-------------+-----------------+----------------+
|          timestamp|                 url|final_price|       sku|currency|        gtin|      specifications|          image_urls|   

Item sort by percentage \
Max discount percentage is 83% \
Item with no discount, default is 0%

In [86]:
df = df.fillna({'percent_discount': 0})  # Thay NaN bằng 0
sorted_df = df.orderBy(df['percent_discount'].desc())
sorted_df.select('percent_discount').describe().show()


+-------+------------------+
|summary|  percent_discount|
+-------+------------------+
|  count|              1000|
|   mean|            25.118|
| stddev|18.614364629784447|
|    min|                 0|
|    max|                83|
+-------+------------------+



In [83]:
df.select('final_price').describe().show()

+-------+-----------------+
|summary|      final_price|
+-------+-----------------+
|  count|             1000|
|   mean|37.98634000000022|
| stddev| 97.4809396210038|
|    min|             0.91|
|    max|          1685.99|
+-------+-----------------+



In [87]:
df.select('categories').show()

+--------------------+
|          categories|
+--------------------+
|["Beauty","Makeup...|
|["Home","Decor","...|
|["Clothing","Wome...|
|["Home","Bedding"...|
|["Clothing","Kids...|
|["Clothing","Wome...|
|["Clothing","Wome...|
|["Clothing","Wome...|
|["Clothing","Wome...|
|["Clothing","Mens...|
|["Beauty","Makeup...|
|["Home","Decor","...|
|["Clothing","Shoe...|
|["Home","Bedding"...|
|["Clothing","Wome...|
|["Clothing","Grap...|
|["Clothing","Wome...|
|["Clothing","Wome...|
|["Home","Bedding"...|
|["Sports & Outdoo...|
+--------------------+
only showing top 20 rows



In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import ArrayType, StringType

def count_products_by_main_category(df):
    # Định nghĩa schema cho cột JSON
    schema = ArrayType(StringType())
    
    # Chuyển đổi chuỗi JSON thành danh sách thực tế
    df = df.withColumn("categories", from_json(col("categories"), schema))
    
    # Lấy category đầu tiên (category cha)
    df = df.withColumn("main_category", col("categories")[0])
    
    # Nhóm theo category cha và đếm số lượng sản phẩm
    category_counts = df.groupBy("main_category").count()
    
    return category_counts

new_df = count_products_by_main_category(df)
new_df.show()


+--------------------+-----+
|       main_category|count|
+--------------------+-----+
|    Home Improvement|    3|
|                Home|  220|
|                Food|   29|
|       Subscriptions|    1|
|      Premium Beauty|    5|
|            Seasonal|    1|
|                Baby|   13|
|        Auto & Tires|    1|
|   Sports & Outdoors|   12|
|Household Essentials|    3|
|         Electronics|    1|
|   Shop with Purpose|    1|
|            Clothing|  594|
|      Patio & Garden|   14|
|                Pets|   17|
| Health and Medicine|   14|
|        Collectibles|    1|
|       Personal Care|   16|
|             Jewelry|    9|
|              Beauty|   31|
+--------------------+-----+
only showing top 20 rows



In [10]:
from pyspark.sql.functions import col, from_json, size, max

def max_category_count(df):
    # Định nghĩa schema cho cột JSON
    schema = ArrayType(StringType())
    
    # Chuyển đổi chuỗi JSON thành danh sách thực tế
    df = df.withColumn("categories", from_json(col("categories"), schema))
    
    # Tính số lượng category của mỗi item
    df = df.withColumn("category_count", size(col("categories")))
    
    # Lấy giá trị lớn nhất
    max_count = df.agg(max("category_count")).collect()[0][0]
    
    return max_count

max_count  = max_category_count(df)
max_count

7

In [None]:
df = df.fillna({'percent_discount': 0})  # Thay NaN bằng 0
sorted_df = df.orderBy(df['percent_discount'].desc())
sorted_df.select('percent_discount').describe().show()


+-------+------------------+
|summary|  percent_discount|
+-------+------------------+
|  count|              1000|
|   mean|            25.118|
| stddev|18.614364629784447|
|    min|                 0|
|    max|                83|
+-------+------------------+



In [None]:
df = df.fillna({'percent_discount': 0})  # Thay NaN bằng 0
sorted_df = df.orderBy(df['percent_discount'].desc())
sorted_df.select('percent_discount').describe().show()


+-------+------------------+
|summary|  percent_discount|
+-------+------------------+
|  count|              1000|
|   mean|            25.118|
| stddev|18.614364629784447|
|    min|                 0|
|    max|                83|
+-------+------------------+



In [None]:
df = df.fillna({'percent_discount': 0})  # Thay NaN bằng 0
sorted_df = df.orderBy(df['percent_discount'].desc())
sorted_df.select('percent_discount').describe().show()


+-------+------------------+
|summary|  percent_discount|
+-------+------------------+
|  count|              1000|
|   mean|            25.118|
| stddev|18.614364629784447|
|    min|                 0|
|    max|                83|
+-------+------------------+



In [18]:
count_df = df.groupBy("available_for_delivery").count()
count_df.show()

+----------------------+-----+
|available_for_delivery|count|
+----------------------+-----+
|                  true|  977|
|                 false|   23|
+----------------------+-----+



In [19]:
count_df = df.groupBy("available_for_pickup").count()
count_df.show()

+--------------------+-----+
|available_for_pickup|count|
+--------------------+-----+
|                true|   48|
|               false|  952|
+--------------------+-----+



In [24]:
brand_count = df.select("brand").distinct().count()
brand_count 

505

we can see that there is 505 brands in total 1000 item, so that the brand is not a good feature for the model 

In [26]:
specification_df = df.select("specifications")
specification_df.show()

+--------------------+
|      specifications|
+--------------------+
|[{"name":"Brand",...|
|[{"name":"Brand",...|
|[{"name":"Feature...|
|[{"name":"Brand",...|
|[{"name":"Country...|
|[{"name":"Clothin...|
|[{"name":"Feature...|
|[{"name":"Feature...|
|[{"name":"Feature...|
|[{"name":"Brand",...|
|[{"name":"Brand",...|
|[{"name":"Country...|
|[{"name":"Brand",...|
|[{"name":"Brand",...|
|[{"name":"Feature...|
|[{"name":"Country...|
|[{"name":"Feature...|
|[{"name":"Country...|
|[{"name":"Brand",...|
|[{"name":"Country...|
+--------------------+
only showing top 20 rows



In [41]:
from pyspark.sql.functions import col, from_json, explode
from pyspark.sql.types import ArrayType, StructType, StructField, StringType

# Định nghĩa schema cho specifications (mảng các struct chứa name và value)
schema = ArrayType(StructType([
    StructField("name", StringType(), True),
    StructField("value", StringType(), True)
]))

# Chuyển cột specifications từ STRING thành JSON ARRAY
df_parsed = df.withColumn("specifications", from_json(col("specifications"), schema))

# Giải nén danh sách specifications và chỉ lấy "name"
df_keys = df_parsed.withColumn("name", explode(col("specifications.name"))).select("name").distinct()

# Chuyển về danh sách Python
key_set = set(df_keys.toPandas()["name"].tolist())

# In ra kết quả
key_set

{'Absorbency',
 'Age Group',
 'Age Range',
 'Allergen-Free Statements',
 'Animal Breed',
 'Animal Lifestage',
 'Animal Type',
 'Animal Welfare Claims',
 'Assembled Product Dimensions (L x W x H)',
 'Assembled Product Weight',
 'Assembly Instructions',
 'Baby Food Packaging',
 'Baby Food Stage',
 'Backing Material',
 'Battery Life',
 'Bed Frame Type',
 'Bed Headboard & Footboard Type',
 'Bed Pillow Size',
 'Bed Sheet Type',
 'Bed Size',
 'Bed Type',
 'Bedding Set Type',
 'Body Part',
 'Body Wash & Shower Gel Type',
 'Boot Shaft Width Style',
 'Boot Style',
 'Brand',
 'Burn Time',
 'Caffeine Designation',
 'Candle Type',
 'Candle Wax Type',
 'Carats',
 'Casual & Dress Shoe Style',
 'Character',
 'Cleaning, Care & Maintenance',
 'Clothing Fit',
 'Clothing Length Style',
 'Clothing Neck Style',
 'Clothing Size',
 'Clothing Size Group',
 'Clothing Style',
 'Coffee Bean Origin',
 'Collection',
 'Color',
 'Color Category',
 'Condition',
 'Container Material',
 'Container Type',
 'Count',
 'Co