# SPARK

In [323]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F
# from pyspark.sql.window import Window as W

In [324]:
def to_snake(name: str) -> str:
    """Функция для очистки строк от лишних пробелов и приведения к единому регистру. Применяется для обработки заголовка в DataFrame"""
    return name.strip().lower().replace(" ", "_")

In [325]:
# Создание Spark сессии
spark = SparkSession.builder \
    .appName("Example") \
    .getOrCreate()

## 1. Загрузка и предварительная обработка данных

1.1. Загрузка и вывод схемы

In [326]:
data_path = "retail_store_sales.csv"

In [327]:
df_retail = spark.read.csv(data_path, header=True, inferSchema=True)

In [328]:
df_retail.printSchema()

root
 |-- Transaction ID: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Item: string (nullable = true)
 |-- Price Per Unit: double (nullable = true)
 |-- Quantity: double (nullable = true)
 |-- Total Spent: double (nullable = true)
 |-- Payment Method: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Transaction Date: date (nullable = true)
 |-- Discount Applied: boolean (nullable = true)



1.2. Очистка названий столбцов

In [329]:
for old_name in df_retail.columns:
    df_retail = df_retail.withColumnRenamed(old_name, to_snake(old_name))

In [330]:
df_retail.printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- item: string (nullable = true)
 |-- price_per_unit: double (nullable = true)
 |-- quantity: double (nullable = true)
 |-- total_spent: double (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- location: string (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- discount_applied: boolean (nullable = true)



1.3. Преобразование типов данных

In [331]:
df_retail = (
    df_retail.withColumn("transaction_id", F.col("transaction_id").cast(StringType()))
    .withColumn("customer_id", F.col("customer_id").cast(StringType()))
    .withColumn("category", F.col("category").cast(StringType()))
    .withColumn("item", F.col("item").cast(StringType()))
    .withColumn("price_per_unit", F.col("price_per_unit").cast(DoubleType()))
    .withColumn("quantity", F.col("quantity").cast(IntegerType()))
    .withColumn("total_spent", F.col("total_spent").cast(DoubleType()))
    .withColumn("payment_method", F.col("payment_method").cast(StringType()))
    .withColumn("location", F.col("location").cast(StringType()))
    .withColumn("transaction_date", F.col("transaction_date").cast(DateType()))
    .withColumn("discount_applied", F.col("discount_applied").cast(BooleanType()))
)

df_retail.printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- item: string (nullable = true)
 |-- price_per_unit: double (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- total_spent: double (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- location: string (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- discount_applied: boolean (nullable = true)



## 2. Очистка и валидация данных

2.1. Восстановление отсутствующих 'item'

In [332]:
df_product_directory = df_retail.select(
    F.col("category").alias("category_ref"),
    F.col("item").alias("item_ref"),
    F.col("price_per_unit").alias("price_per_unit_ref")
).dropna(subset=["category_ref", "item_ref"]).distinct()

df_product_directory.printSchema()

root
 |-- category_ref: string (nullable = true)
 |-- item_ref: string (nullable = true)
 |-- price_per_unit_ref: double (nullable = true)



In [333]:
df_retail = (
    df_retail.join(df_product_directory, (df_retail.category == df_product_directory.category_ref) & (df_retail.price_per_unit == df_product_directory.price_per_unit_ref), "inner")
    .withColumn("item", F.when(F.col("item").isNull(), F.col("item_ref")).otherwise(F.col("item")))
    .drop("category_ref", "item_ref", "price_per_unit_ref")
)

2.2. Восстановление 'total_spent'

In [334]:
df_retail = (
    df_retail.withColumn(
        "total_spent", F.when(
            F.col("total_spent").isNull() & F.col("quantity").isNotNull() & F.col("price_per_unit").isNotNull(),
            F.col("quantity") * F.col("price_per_unit")
        ).otherwise(
            F.col("total_spent")
        )
    )
)

2.3. Восстановление 'quantity' и 'price_per_unit',

In [335]:
df_retail = df_retail.withColumn(
    "quantity", F.when(
        F.col("quantity").isNull() & F.col("price_per_unit").isNotNull() & F.col("total_spent").isNull(),
        F.round(F.col("total_spent") / F.col("price_per_unit"), 0)
    ).otherwise(
        F.col("quantity")
    )
).withColumn(
    "quantity",
    F.col("quantity").cast(IntegerType())
)

df_retail = df_retail.withColumn(
    "price_per_unit", F.when(
        F.col("price_per_unit").isNull() & F.col("total_spent").isNotNull() & F.col("quantity").isNotNull(),
        F.round(F.col("total_spent") / F.col("quantity"), 2)
    ).otherwise(F.round(F.col("price_per_unit"), 2))
)

2.4. Удаление оставшихся строк с пропусками

In [336]:
df_retail = df_retail.dropna(
    how="any",
    subset=["category", "quantity", "total_spent", "price_per_unit"]
)
df_retail.show()

+--------------+-----------+--------------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|transaction_id|customer_id|            category|        item|price_per_unit|quantity|total_spent|payment_method|location|transaction_date|discount_applied|
+--------------+-----------+--------------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|   TXN_6867343|    CUST_09|          Patisserie| Item_10_PAT|          18.5|      10|      185.0|Digital Wallet|  Online|      2024-04-08|            true|
|   TXN_3731986|    CUST_22|       Milk Products|Item_17_MILK|          29.0|       9|      261.0|Digital Wallet|  Online|      2023-07-23|            true|
|   TXN_9303719|    CUST_02|            Butchers| Item_12_BUT|          21.5|       2|       43.0|   Credit Card|  Online|      2022-10-05|           false|
|   TXN_9458126|    CUST_06|           Beverages| Item_16_

## 3. Разведочный анализ данных

3.1. Самые популярные категории товаров

In [337]:
# Общее количество проданных товаров по категориям
total_category_df = df_retail.groupBy(F.col("category")).agg(
    F.sum("quantity").alias("total_in_category")
)
total_category_df = total_category_df.orderBy(F.desc("total_in_category"))
total_category_df.show()

+--------------------+-----------------+
|            category|total_in_category|
+--------------------+-----------------+
|           Furniture|             8083|
|           Beverages|             7974|
|                Food|             7925|
|Electric househol...|             7897|
|       Milk Products|             7889|
|Computers and ele...|             7832|
|            Butchers|             7774|
|          Patisserie|             7515|
+--------------------+-----------------+



In [338]:
# ТОП-5 категорий по общему количеству проданных единиц
total_category_df.show(5)

+--------------------+-----------------+
|            category|total_in_category|
+--------------------+-----------------+
|           Furniture|             8083|
|           Beverages|             7974|
|                Food|             7925|
|Electric househol...|             7897|
|       Milk Products|             7889|
+--------------------+-----------------+
only showing top 5 rows



3.2. Анализ среднего чека

In [339]:
# Среднее значение 'total_spent' для каждого метода оплаты
avg_total_spent = df_retail.groupBy("payment_method").agg(
    F.round(F.avg("total_spent"), 2).alias("avg_total_spent")
)
avg_total_spent.orderBy("avg_total_spent").show()

+--------------+---------------+
|payment_method|avg_total_spent|
+--------------+---------------+
|Digital Wallet|         128.68|
|   Credit Card|         129.03|
|          Cash|         131.14|
+--------------+---------------+



In [340]:
# Среднее значение 'total_spent' для каждого места, где прошла оплата
avg_location_total_spent = df_retail.groupBy("location").agg(
    F.round(F.avg("total_spent"), 2).alias("avg_location_total_spent")
)
avg_location_total_spent.orderBy("avg_location_total_spent").show()

+--------+------------------------+
|location|avg_location_total_spent|
+--------+------------------------+
|In-store|                  128.82|
|  Online|                  130.45|
+--------+------------------------+



## 4. Генерация признаков

4.1. Временные признаки. Добавление новых столбцов: день недели и месяца

In [341]:
df_retail = df_retail.withColumn(
    "day_of_week", F.dayofweek(F.col("transaction_date"))
).withColumn(
    "transaction_month", F.month(F.col("transaction_date"))
)

4.2. Продажи по дням недели

In [342]:
avg_total_day_of_week = df_retail.groupBy("day_of_week").agg(
    F.round(F.avg(F.col("total_spent")), 2).alias("avg_day_of_week")
)
avg_total_day_of_week.orderBy("day_of_week").show()

+-----------+---------------+
|day_of_week|avg_day_of_week|
+-----------+---------------+
|          1|         130.31|
|          2|         126.08|
|          3|         129.03|
|          4|         126.62|
|          5|         129.63|
|          6|         134.52|
|          7|         131.17|
+-----------+---------------+



4.3. Продажи по месяцам

In [343]:
avg_transaction_month = df_retail.groupBy("transaction_month").agg(
    F.round(F.avg("total_spent"), 2).alias("avg_transaction_month")
)
avg_transaction_month.orderBy("transaction_month").show()

+-----------------+---------------------+
|transaction_month|avg_transaction_month|
+-----------------+---------------------+
|                1|               135.19|
|                2|               129.76|
|                3|               126.94|
|                4|                132.0|
|                5|               127.47|
|                6|               131.32|
|                7|               127.65|
|                8|               122.77|
|                9|               130.62|
|               10|               128.05|
|               11|               130.04|
|               12|               132.32|
+-----------------+---------------------+



4.4. Признаки клиента

In [344]:
customer_lifetime_value = df_retail.groupBy("customer_id").agg(
        (F.avg(F.col("total_spent")) *
        F.count(F.col("total_spent")) *
        (F.datediff(F.max(F.col("transaction_date")), F.min(F.col("transaction_date")))))
        .alias("CVL")
)
customer_lifetime_value.orderBy(F.desc("CVL")).show(10)

+-----------+-------------------+
|customer_id|                CVL|
+-----------+-------------------+
|    CUST_24|7.190870399999999E7|
|    CUST_05|       7.06880385E7|
|    CUST_16|       7.01990905E7|
|    CUST_08|6.976405500000001E7|
|    CUST_13|       6.93800655E7|
|    CUST_15|        6.7553444E7|
|    CUST_10|         6.712866E7|
|    CUST_23|        6.6190258E7|
|    CUST_22|          6.60006E7|
|    CUST_21|        6.5960734E7|
+-----------+-------------------+
only showing top 10 rows



In [345]:
spark.stop()