CÂU 1: Hãy đọc dữ liệu từ các file csv, sử dụng tự suy ra kiểu dữ liệu cho mỗi cột.

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Lab3').getOrCreate()

orders_df = spark.read.format("csv").options(header='True', delimiter=';', inferSchema='True').load("Orders.csv")
customers_df = spark.read.format("csv").options(header='True', delimiter=';', inferSchema='True').load("Customer_List.csv")
order_items_df = spark.read.format("csv").options(header='True', delimiter=';', inferSchema='True').load("Order_Items.csv")
products_df = spark.read.format("csv").options(header='True', delimiter=';', inferSchema='True').load("Products.csv")
reviews_df = spark.read.format("csv").options(header='True', delimiter=';', inferSchema='True').load("Order_Reviews.csv")

orders_df.printSchema()
customers_df.printSchema()
order_items_df.printSchema()
products_df.printSchema()
reviews_df.printSchema()

customers_df.show(5)
orders_df.show(5)
order_items_df.show(5)
products_df.show(5)
reviews_df.show(5)

root
 |-- Order_ID: string (nullable = true)
 |-- Customer_Trx_ID: string (nullable = true)
 |-- Order_Status: string (nullable = true)
 |-- Order_Purchase_Timestamp: timestamp (nullable = true)
 |-- Order_Approved_At: timestamp (nullable = true)
 |-- Order_Delivered_Carrier_Date: timestamp (nullable = true)
 |-- Order_Delivered_Customer_Date: timestamp (nullable = true)
 |-- Order_Estimated_Delivery_Date: timestamp (nullable = true)

root
 |-- Customer_Trx_ID: string (nullable = true)
 |-- Subscriber_ID: string (nullable = true)
 |-- Subscribe_Date: date (nullable = true)
 |-- First_Order_Date: date (nullable = true)
 |-- Customer_Postal_Code: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Customer_Country: string (nullable = true)
 |-- Customer_Country_Code: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)

root
 |-- Order_ID: string (nullable = true)
 |-- Order_Item_ID: integer (nullable = true)
 |-- Produ

CÂU 2: Thống kê tổng số đơn hàng, số lượng khách hàng và người bán.

In [9]:
total_orders = orders_df.select("Order_ID").count()
total_customers = customers_df.select("Customer_Trx_ID").distinct().count()
total_sellers = order_items_df.select("Seller_ID").distinct().count()

print(f"Tổng số đơn hàng: {total_orders}")
print(f"Tổng số khách hàng: {total_customers}")
print(f"Tổng số người bán: {total_sellers}")

Tổng số đơn hàng: 99441
Tổng số khách hàng: 99442
Tổng số người bán: 3095


CÂU 3: Phân tích số lượng đơn hàng theo quốc gia, sắp xếp theo thứ tự giảm dần

In [18]:
from pyspark.sql.functions import col

orders_country = orders_df.join(
    customers_df,
    on="Customer_Trx_ID",
    how="left"
)

result = orders_country.groupBy("Customer_Country") \
    .count() \
    .orderBy(col("count").desc())

result.show(50)


+----------------+-----+
|Customer_Country|count|
+----------------+-----+
|         Germany|41754|
|          France|12848|
|     Netherlands|11629|
|         Belgium| 5464|
|         Austria| 5043|
|     Switzerland| 3640|
|  United Kingdom| 3382|
|          Poland| 2139|
|         Czechia| 2034|
|           Italy| 2025|
|           Spain| 1651|
|        Portugal| 1336|
|          Sweden|  975|
|         Denmark|  905|
|          Serbia|  746|
|          Norway|  716|
|        Slovakia|  534|
|        Slovenia|  495|
|          Turkey|  485|
|          Greece|  412|
|       Lithuania|  351|
|          Latvia|  280|
|         Croatia|  254|
|         Estonia|  148|
|         Finland|   81|
|      Luxembourg|   68|
|         Andorra|   46|
+----------------+-----+



CÂU 4: Phân tích số lượng đơn hàng nhóm theo năm, tháng đặt hàng (Hiển thị theo năm
tăng dần, tháng giảm dần)

In [55]:
from pyspark.sql.functions import month, year, count, asc, desc

orders_by_time = orders_df.withColumn("order_year", year("Order_Purchase_Timestamp")) \
                          .withColumn("order_month", month("Order_Purchase_Timestamp")) \
                          .groupBy("order_year", "order_month") \
                          .agg(count("Order_ID"))\
                          .orderBy(asc("order_year"), desc("order_month"))
print("Số đơn hàng đặt theo năm tăng dần, tháng giảm dần")
orders_by_time.show(50)

Số đơn hàng đặt theo năm tăng dần, tháng giảm dần
+----------+-----------+---------------+
|order_year|order_month|count(Order_ID)|
+----------+-----------+---------------+
|      2022|         12|              1|
|      2022|         10|            324|
|      2022|          9|              4|
|      2023|         12|           5673|
|      2023|         11|           7544|
|      2023|         10|           4631|
|      2023|          9|           4285|
|      2023|          8|           4331|
|      2023|          7|           4026|
|      2023|          6|           3245|
|      2023|          5|           3700|
|      2023|          4|           2404|
|      2023|          3|           2682|
|      2023|          2|           1780|
|      2023|          1|            800|
|      2024|         10|              4|
|      2024|          9|             16|
|      2024|          8|           6512|
|      2024|          7|           6292|
|      2024|          6|           6167|
|      

CÂU 5: Thống kê điểm đánh giá trung bình, số lượng đánh giá theo từng mức (ví dụ: 1 đến
5)

In [68]:
from pyspark.sql.functions import avg, col, count

#Do dữ liệu Review_Score đang là kiểu string nên phải chuyển về kiểu int để tính trung bình
reviews_df_int = reviews_df.withColumn("Review_Score", reviews_df["Review_Score"].cast("int"))

average_review_score = reviews_df_int \
    .filter(col("Review_Score").isNotNull()) \
    .select(avg("Review_Score"))

print("Điểm đánh giá trung bình:")
average_review_score.show()

number_of_reviews_by_score = reviews_df_int \
    .filter(col("Review_Score").isNotNull()) \
    .groupBy("Review_Score") \
    .agg(count("*").alias("Review Count")) \
    .orderBy("Review_Score")

print("Số lượng đánh giá theo từng mức:")
number_of_reviews_by_score.show()

Điểm đánh giá trung bình:
+------------------+
| avg(Review_Score)|
+------------------+
|4.0864214950162765|
+------------------+

Số lượng đánh giá theo từng mức:
+------------+------------+
|Review_Score|Review Count|
+------------+------------+
|           1|       11424|
|           2|        3151|
|           3|        8179|
|           4|       19141|
|           5|       57328|
+------------+------------+



CÂU 6: Tính doanh thu (giá sản phẩm + phí vận chuyển) trong năm 2024 và nhóm theo
danh mục sản phẩm

In [62]:
from pyspark.sql.functions import year, col, sum as spark_sum

# Lọc đơn hàng trong năm 2024
df_2024_orders = orders_df.join(
    order_items_df,
    on="Order_ID",
    how="inner"
    ).join(
        products_df,
        on="Product_ID",
        how="left"
    ).filter(
        (year(col("Order_Purchase_Timestamp")) == 2024)
    )

# Tính doanh thu
revenue_by_category = (
    df_2024_orders
    .withColumn("revenue", col("price") + col("freight_value"))
    .groupBy("product_category_name")
    .agg(spark_sum("revenue").alias("Revenue"))
)

revenue_by_category.show(50, truncate=False)


+---------------------------------------+------------------+
|product_category_name                  |Revenue           |
+---------------------------------------+------------------+
|Kitchen_Dining_Laundry_Garden_Furniture|34951.24999999999 |
|Fashion_Male_Clothing                  |2810.4100000000003|
|Stationery                             |164743.84999999986|
|Books_Technical                        |16570.430000000004|
|Food                                   |25709.750000000007|
|Housewares                             |491576.9600000012 |
|Books_Imported                         |4090.710000000001 |
|Perfumery                              |204562.53999999992|
|Art                                    |17947.630000000005|
|Fashion_Bags_Accessories               |85091.27999999997 |
|Computers                              |67860.8           |
|Cool_Stuff                             |273910.0500000001 |
|Furniture_Living_Room                  |42774.14000000003 |
|Consoles_Games         