In [1]:
%cd spark/hw-3

/home/vdubyna/Workspace/Jupyter/spark/hw-3


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum, round as spark_round

# === 1. Створення Spark-сесії ===
spark = (
    SparkSession.builder
    .appName("ProductCategoryAnalysis")
    .master("spark://10.0.1.28:7077")  # використовую локальний інстанс для виконання
    .getOrCreate()
)

25/08/24 08:56:08 WARN Utils: Your hostname, vdubyna-X670E-Steel-Legend resolves to a loopback address: 127.0.1.1; using 10.0.1.28 instead (on interface wlp112s0)
25/08/24 08:56:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/24 08:56:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [11]:

# === 2. Завантаження CSV-файлів ===
users_df = spark.read.csv("data/users.csv", header=True, inferSchema=True)
purchases_df = spark.read.csv("data/purchases.csv", header=True, inferSchema=True)
products_df = spark.read.csv("data/products.csv", header=True, inferSchema=True)

# === 2.1. Очищення даних (видалення рядків з пропущеними значеннями) ===
users_df = users_df.dropna()
purchases_df = purchases_df.dropna()
products_df = products_df.dropna()
full_df = purchases_df \
    .join(users_df, on="user_id", how="inner") \
    .join(products_df, on="product_id", how="inner")
# === 2.2 Додаємо загальну ціну для спрощення подальших розрахунків ===
full_df = full_df.withColumn("total_price", col("quantity") * col("price"))
full_df.show()

+----------+-------+-----------+----------+--------+--------+---+-------------------+------------+-----------+-----+------------------+
|product_id|user_id|purchase_id|      date|quantity|    name|age|              email|product_name|   category|price|       total_price|
+----------+-------+-----------+----------+--------+--------+---+-------------------+------------+-----------+-----+------------------+
|         9|     52|          1|2022-01-01|       1| User_52| 39| user52@example.com|   Product_9|     Beauty|  6.0|               6.0|
|        37|     93|          2|2022-01-02|       8| User_93| 25| user93@example.com|  Product_37|   Clothing|  6.0|              48.0|
|        33|     15|          3|2022-01-03|       1| User_15| 30| user15@example.com|  Product_33|       Home|  9.4|               9.4|
|        42|     72|          4|2022-01-04|       9| User_72| 39| user72@example.com|  Product_42|     Beauty|  9.1| 81.89999999999999|
|        24|     21|          6|2022-01-06|     

In [12]:
# === 3: Загальна сума покупок за категорією ===
df_total_by_category = full_df.groupBy("category") \
    .agg(spark_round(spark_sum("total_price"), 2).alias("total_spent"))
df_total_by_category.show()

+-----------+-----------+
|   category|total_spent|
+-----------+-----------+
|       Home|     1438.9|
|     Sports|     1755.5|
|Electronics|     1141.9|
|   Clothing|      696.1|
|     Beauty|      441.7|
+-----------+-----------+



In [13]:
# === 4: Сума покупок для віку 18–25 ===
age_filtered_df = full_df.filter((col("age") >= 18) & (col("age") <= 25))
df_18_25_by_category = age_filtered_df.groupBy("category") \
    .agg(spark_round(spark_sum("total_price"), 2).alias("total_spent_18_25"))
df_18_25_by_category.show()

+-----------+-----------------+
|   category|total_spent_18_25|
+-----------+-----------------+
|       Home|            361.1|
|     Sports|            310.5|
|Electronics|            249.6|
|   Clothing|            245.0|
|     Beauty|             41.4|
+-----------+-----------------+



In [14]:
# === 5: Частка покупок у % ===
total_18_25 = age_filtered_df.agg(spark_sum("total_price").alias("total")).collect()[0]["total"]
df_percentages = df_18_25_by_category.withColumn(
    "percentage",
    spark_round((col("total_spent_18_25") / total_18_25) * 100, 2)
)
df_percentages.show()

+-----------+-----------------+----------+
|   category|total_spent_18_25|percentage|
+-----------+-----------------+----------+
|       Home|            361.1|      29.9|
|     Sports|            310.5|     25.71|
|Electronics|            249.6|     20.67|
|   Clothing|            245.0|     20.29|
|     Beauty|             41.4|      3.43|
+-----------+-----------------+----------+



In [15]:
# === 6: Топ-3 категорії за % витрат ===
df_percentages.orderBy(col("percentage").desc()).limit(3).show()

+-----------+-----------------+----------+
|   category|total_spent_18_25|percentage|
+-----------+-----------------+----------+
|       Home|            361.1|      29.9|
|     Sports|            310.5|     25.71|
|Electronics|            249.6|     20.67|
+-----------+-----------------+----------+



In [19]:
# === Теж саме з використанням SQL

# Створення тимчасових представлень
users_df.createOrReplaceTempView("users")
purchases_df.createOrReplaceTempView("purchases")
products_df.createOrReplaceTempView("products")

In [20]:
# Обʼєднаний вигляд із total_price
spark.sql("""
    CREATE OR REPLACE TEMP VIEW full_data AS
    SELECT
        u.user_id,
        u.age,
        p.product_id,
        pr.category,
        pr.price,
        p.quantity,
        (p.quantity * pr.price) AS total_price
    FROM purchases p
    JOIN users u ON p.user_id = u.user_id
    JOIN products pr ON p.product_id = pr.product_id
""")

DataFrame[]

In [21]:

# Крок 3: Загальна сума покупок за категоріями
spark.sql("""
    SELECT category, ROUND(SUM(total_price), 2) AS total_spent
    FROM full_data
    GROUP BY category
    ORDER BY total_spent DESC
""").show()

+-----------+-----------+
|   category|total_spent|
+-----------+-----------+
|     Sports|     1755.5|
|       Home|     1438.9|
|Electronics|     1141.9|
|   Clothing|      696.1|
|     Beauty|      441.7|
+-----------+-----------+



In [23]:

# Крок 4: Сума покупок за категоріями для 18–25
spark.sql("""
    CREATE OR REPLACE TEMP VIEW age_18_25 AS
    SELECT *
    FROM full_data
    WHERE age BETWEEN 18 AND 25
""")

spark.sql("""
    SELECT category, ROUND(SUM(total_price), 2) AS total_spent_18_25
    FROM age_18_25
    GROUP BY category
    ORDER BY total_spent_18_25 DESC
""").show()

+-----------+-----------------+
|   category|total_spent_18_25|
+-----------+-----------------+
|       Home|            361.1|
|     Sports|            310.5|
|Electronics|            249.6|
|   Clothing|            245.0|
|     Beauty|             41.4|
+-----------+-----------------+



In [24]:
# Крок 5: Частка від загальної суми
spark.sql("""
    CREATE OR REPLACE TEMP VIEW category_percentages AS
    SELECT
        category,
        ROUND(SUM(total_price), 2) AS total_spent_18_25,
        ROUND(SUM(total_price) / (SELECT SUM(total_price) FROM age_18_25) * 100, 2) AS percentage
    FROM age_18_25
    GROUP BY category
""")

spark.sql("SELECT * FROM category_percentages ORDER BY percentage DESC").show()

+-----------+-----------------+----------+
|   category|total_spent_18_25|percentage|
+-----------+-----------------+----------+
|       Home|            361.1|      29.9|
|     Sports|            310.5|     25.71|
|Electronics|            249.6|     20.67|
|   Clothing|            245.0|     20.29|
|     Beauty|             41.4|      3.43|
+-----------+-----------------+----------+



In [25]:
# Крок 6: Топ-3 категорії
spark.sql("""
    SELECT *
    FROM category_percentages
    ORDER BY percentage DESC
    LIMIT 3
""").show()

+-----------+-----------------+----------+
|   category|total_spent_18_25|percentage|
+-----------+-----------------+----------+
|       Home|            361.1|      29.9|
|     Sports|            310.5|     25.71|
|Electronics|            249.6|     20.67|
+-----------+-----------------+----------+



In [26]:
spark.stop()