# goit-de-hw-03. Аналіз даних у PySpark

## Завдання


**Початкові дані**

Вхідні CSV-файли:

1. [users.csv](csv/users.csv) — дані користувачів:

- `user_id` (унікальний ідентифікатор користувача)
- `name` (ім'я користувача)
- `age` (вік користувача)
- `email` (електронна адреса)

2. [purchases.csv](csv/purchases.csv) — дані про покупки:

- `purchase_id` (унікальний ідентифікатор покупки)
- `user_id` (ідентифікатор користувача, який зробив покупку)
- `product_id` (унікальний ідентифікатор продукту)
- `date` (дата покупки)
- `quantity` (кількість одиниць купленого товару)

3. [products.csv)](csv/products.csv) — інформація про продукти:

- `product_id` (унікальний ідентифікатор продукту)
- `product_name` (назва продукту)
- `category` (категорія продукту)
- `price` (ціна одиниці товару)

**Завдання**

1. Завантажте та прочитайте кожен CSV-файл як окремий DataFrame.
2. Очистіть дані, видаляючи будь-які рядки з пропущеними значеннями.
3. Визначте загальну суму покупок за кожною категорією продуктів.
4. Визначте суму покупок за кожною категорією продуктів для вікової категорії від 18 до 25 включно.
5. Визначте частку покупок за кожною категорією товарів від сумарних витрат для вікової категорії від 18 до 25 років.
6. Виберіть 3 категорії продуктів з найвищим відсотком витрат споживачами віком від 18 до 25 років.

   💡 Відсоток потрібно округлити до другого знака після коми.

## Рішення


### 1. Завантажте та прочитайте кожен CSV-файл як окремий DataFrame.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, round, sum as _sum

# Створюємо сесію Spark
spark = SparkSession.builder.appName("goit-de-hw-03").getOrCreate()


# Завантажуємо датасет в окремі DataFrame
df_users = spark.read.csv("./csv/users.csv", header=True, inferSchema=True)
df_products = spark.read.csv("./csv/products.csv", header=True, inferSchema=True)
df_purchases = spark.read.csv("./csv/purchases.csv", header=True, inferSchema=True)


# Структура даних
df_users.printSchema()
df_products.printSchema()
df_purchases.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- email: string (nullable = true)

root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- price: double (nullable = true)

root
 |-- purchase_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- quantity: integer (nullable = true)



### 2. Очистіть дані, видаляючи будь-які рядки з пропущеними значеннями.

In [2]:
print('Кількість записів до очищення:')
print(f"{df_users.count() = }")
print(f"{df_products.count() = }")
print(f"{df_purchases.count() = }")

# Видаляємо будь-які рядки з пропущеними значеннями.
df_users = df_users.dropna()
df_products = df_products.dropna()
df_purchases = df_purchases.dropna()

print("Кількість записів після очищення:")
print(f"{df_users.count() = }")
print(f"{df_products.count() = }")
print(f"{df_purchases.count() = }")

Кількість записів до очищення:
df_users.count() = 100
df_products.count() = 50
df_purchases.count() = 200
Кількість записів після очищення:
df_users.count() = 95
df_products.count() = 47
df_purchases.count() = 195


### 3. Визначте загальну суму покупок за кожною категорією продуктів.

Об'єднаємо датафреми в один і додамо поле загальної суми покупки.


In [3]:
# Об'єднаємо дані і додамо загальну суму покупки
df_joined = (
    df_users.join(df_purchases, "user_id")
    .join(df_products, "product_id")
    .withColumn("total_cost", col("price") * col("quantity"))
)

df_joined.show()
# df_joined.printSchema()

+----------+-------+--------+---+-------------------+-----------+----------+--------+------------+-----------+-----+------------------+
|product_id|user_id|    name|age|              email|purchase_id|      date|quantity|product_name|   category|price|        total_cost|
+----------+-------+--------+---+-------------------+-----------+----------+--------+------------+-----------+-----+------------------+
|         9|     52| User_52| 39| user52@example.com|          1|2022-01-01|       1|   Product_9|     Beauty|  6.0|               6.0|
|        37|     93| User_93| 25| user93@example.com|          2|2022-01-02|       8|  Product_37|   Clothing|  6.0|              48.0|
|        33|     15| User_15| 30| user15@example.com|          3|2022-01-03|       1|  Product_33|       Home|  9.4|               9.4|
|        42|     72| User_72| 39| user72@example.com|          4|2022-01-04|       9|  Product_42|     Beauty|  9.1| 81.89999999999999|
|        24|     21| User_21| 37| user21@example

Визначимо загальну суму покупок по категоріям за допомогою функцій групування та агрегації

In [4]:
# Визначимо загальну суму покупок за кожною категорією продуктів

df_joined.groupBy("category").agg(
    round(_sum("total_cost"), 2).alias("total_sum")
).show()

+-----------+---------+
|   category|total_sum|
+-----------+---------+
|       Home|   1438.9|
|     Sports|   1755.5|
|Electronics|   1141.9|
|   Clothing|    696.1|
|     Beauty|    441.7|
+-----------+---------+



### 4. Визначте суму покупок за кожною категорією продуктів для вікової категорії від 18 до 25 включно.

In [5]:
# Створимо цільовий датафрейм
df_18_25 = df_joined.filter(df_joined.age >= 18).filter(df_joined.age <= 25)
# df_18_25.show()

In [6]:
# Визначимо суму покупок за кожною категорією продуктів для вікової категорії від 18 до 25 включно.

df_18_25.groupBy("category").agg(
    round(_sum("total_cost"), 2).alias("total_sum_18_25")
).show()

+-----------+---------------+
|   category|total_sum_18_25|
+-----------+---------------+
|       Home|          361.1|
|     Sports|          310.5|
|Electronics|          249.6|
|   Clothing|          245.0|
|     Beauty|           41.4|
+-----------+---------------+



### 5. Визначте частку покупок за кожною категорією товарів від сумарних витрат для вікової категорії від 18 до 25 років.

In [7]:
# Сумарні витрати
total_spending_18_25 = df_18_25.agg(
    _sum("total_cost").alias("total_spending")
).collect()[0]["total_spending"]
total_spending_18_25

1207.6

In [8]:
# Визначимо частку покупок за кожною категорією товарів від сумарних витрат для вікової категорії від 18 до 25 років
df_18_25.groupBy("category").agg(
    round(_sum("total_cost"), 2).alias("total_sum_18_25"),
    round(_sum("total_cost") / total_spending_18_25 * 100, 2).alias("part_sum_18_25"),
).show()

+-----------+---------------+--------------+
|   category|total_sum_18_25|part_sum_18_25|
+-----------+---------------+--------------+
|       Home|          361.1|          29.9|
|     Sports|          310.5|         25.71|
|Electronics|          249.6|         20.67|
|   Clothing|          245.0|         20.29|
|     Beauty|           41.4|          3.43|
+-----------+---------------+--------------+



In [9]:
# 6 Виберіть 3 категорії продуктів з найвищим відсотком витрат споживачами віком від 18 до 25 років.
df_18_25.groupBy("category").agg(
    round(_sum("total_cost"), 2).alias("total_sum_18_25"),
    round(_sum("total_cost") / total_spending_18_25 * 100, 2).alias("part_sum_18_25"),
).sort("part_sum_18_25", ascending=False).limit(3).show()

+-----------+---------------+--------------+
|   category|total_sum_18_25|part_sum_18_25|
+-----------+---------------+--------------+
|       Home|          361.1|          29.9|
|     Sports|          310.5|         25.71|
|Electronics|          249.6|         20.67|
+-----------+---------------+--------------+



In [10]:
# 6 Виберіть 3 категорії продуктів з найвищим відсотком витрат споживачами віком від 18 до 25 років.
df_joined.filter(df_joined.age >= 18).filter(df_joined.age <= 25).groupBy(
    "category"
).agg(
    round(_sum("total_cost"), 2).alias("total_sum_18_25"),
    round(_sum("total_cost") / total_spending_18_25 * 100, 2).alias("part_sum_18_25"),
).sort(
    "part_sum_18_25", ascending=False
).limit(
    3
).show()

+-----------+---------------+--------------+
|   category|total_sum_18_25|part_sum_18_25|
+-----------+---------------+--------------+
|       Home|          361.1|          29.9|
|     Sports|          310.5|         25.71|
|Electronics|          249.6|         20.67|
+-----------+---------------+--------------+



In [11]:
# Зупиняємо Spark
spark.stop()