In [4]:
from os import truncate
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
import re

# Загрузка и вывод схемы: Загрузите файл retail_store_sales.csv.
# Выведите первые 5 строк загруженного DataFrame и его схему (df.printSchema()).
spark = SparkSession.builder \
        .appName('retail_store_sales') \
        .getOrCreate()

df = spark.read.options(header='True',inferSchema='True',delimiter=',') \
  .csv("retail_store_sales.csv")

df.show(5, truncate=False)
df.printSchema()

# Очистка названий столбцов: Преобразуйте названия всех столбцов к единому регистру - snake_case.
# Выведите обновленную схему DataFrame  или названия столбцов, чтобы убедиться в изменении названий.
def to_snake(name: str) -> str:
    s = re.sub(r'[.\s\-]+', '_', name)                            # пробелы/точки/дефисы → _
    s = re.sub(r'([A-Z]+)([A-Z][a-z])', r'\1_\2', s)              # HTTPServer → HTTP_Server
    s = re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', s)                 # orderID → order_ID
    s = re.sub(r'[^0-9A-Za-z_]', '', s)                           # убрать прочие символы
    s = re.sub(r'_+', '_', s).strip('_').lower()                  # одинарные _, нижний регистр
    return s or "col"

def dedupe(names):
    seen, out = set(), []
    for n in names:
        base, k, m = n, 1, n
        while m in seen:
            k += 1
            m = f"{base}_{k}"
        seen.add(m)
        out.append(m)
    return out

new_cols = dedupe([to_snake(c) for c in df.columns])
df_snake = df.toDF(*new_cols)
df_snake.printSchema()

# Преобразование типов данных: Проанализируйте к каким типам данных относятся данные в столбцах и приведите столбец к соответствующему типу.
# Убедитесь, что некорректные или отсутствующие значения преобразуются в null в соответствующих типах данных.
# inferSchema='True' определяет типы столбцов на основе данных, в колонках нет псевдо-null значений

#  Восстановление отсутствующих item:
# Так как данные статические для каждого товара, то составьте справочник товаров в отдельный DataFrame с Category, Item и Price Rer Unit.
# Для транзакций, где отсутствует название товара, но имеется категория и цена,
# попытайтесь определить название товара, путём объединения (join) с загруженным справочником товаров.
# Выведите 20 строк, демонстрирующих восстановленные значения.

product_dict = (df_snake
    .select("category", "price_per_unit", "item")
    .where(F.col("item").isNotNull())
    .groupBy("category", "price_per_unit")
    .agg(F.first("item", ignorenulls=True).alias("item_ref"))
)
to_fix = df_snake.where(
    F.col("item").isNull() &
    F.col("category").isNotNull() &
    F.col("price_per_unit").isNotNull()
)
restored = (to_fix.alias("t")
    .join(F.broadcast(product_dict).alias("d"), ["category","price_per_unit"], "left")
    .withColumn("item_restored", F.col("d.item_ref"))
    .where(F.col("item_restored").isNotNull())
)
restored.select(
    "transaction_id", "category", "price_per_unit",
    F.col("t.item").alias("item_before"),
    "item_restored"
).show(20, truncate=False)

# Восстановление Total Spent:  Найдите все транзакции, с пропусками в общей сумме и обновите ее,
# пересчитав её как quantity * price_per_unit для всех записей.
df_recalc = df_snake.withColumn(
    "total_spent",
    F.when(
        F.col("quantity").isNotNull() & F.col("price_per_unit").isNotNull(),
        F.round(F.col("quantity") * F.col("price_per_unit"), 2)
    ).otherwise(F.lit(None).cast("double"))
)
df_recalc.show(20, truncate=False)

# Заполнение отсутствующих Quantity и Rrice Rer Unit:
# Для транзакций, где отсутствуют значения о количестве проданного товара , но имеются сумма транзакции и цена за товар ,
# вычислите количество проданного товара и заполните пропущенные значения. Результат приведите к целому числу.
# Аналогично, если  отсутствует цена за единицу товара , но общая сумма и количество имеются, вычислите цену за единицу
# и заполните пропущенные значения. Округлите до двух знаков после запятой. Выведите 20 строк, демонстрирующих заполненные значения.
calc_quantity = F.when(
    F.col("quantity").isNull() &
    F.col("total_spent").isNotNull() &
    F.col("price_per_unit").isNotNull() &
    (F.col("price_per_unit") != 0),
    F.round(F.col("total_spent") / F.col("price_per_unit"), 0).cast(IntegerType())
)

calc_price = F.when(
    F.col("price_per_unit").isNull() &
    F.col("total_spent").isNotNull() &
    F.col("quantity").isNotNull() &
    (F.col("quantity") != 0),
    F.round(F.col("total_spent") / F.col("quantity"), 2).cast(DoubleType())
)

df_calc = (df_snake
    .withColumn("calc_quantity", calc_quantity)
    .withColumn("calc_price_per_unit", calc_price)
)

df_filled = (df_calc
    .withColumn("quantity", F.coalesce(F.col("quantity"), F.col("calc_quantity")))
    .withColumn("price_per_unit", F.coalesce(F.col("price_per_unit"), F.col("calc_price_per_unit")))
    .drop("calc_quantity", "calc_price_per_unit")
)


df_filled.show(20, truncate=False)

# Удалите оставшиеся строки с пропусками в Category, Quantity ,Total Spent и Rrice Rer Unit
df_clean = df_filled.filter(
    F.col("category").isNotNull() &
    F.col("quantity").isNotNull() &
    F.col("total_spent").isNotNull() &
    F.col("price_per_unit").isNotNull()
)
df_clean.show(20, truncate=False)

# Самые популярные категории товаров: Рассчитайте общее количество проданных единиц товара  для каждой категории.
# Определите Топ-5 категорий по общему количеству проданных единиц.
top5 = (df_clean
    .groupBy("category")
    .agg(F.sum(F.col("quantity")).alias("total_units"))
    .orderBy(F.desc("total_units"), F.asc("category"))
    .limit(5)
)
top5.show(truncate=False)

# Рассчитайте среднее значение Total Spent для каждого метода оплаты. Округлите до двух знаков после запятой.
avg_by_payment = (df_clean
    .where(F.col("payment_method").isNotNull())
    .groupBy("payment_method")
    .agg(F.round(F.avg("total_spent"), 2).alias("avg_total_spent"))
    .orderBy(F.desc("avg_total_spent"), F.asc("payment_method"))
)
avg_by_payment.show(truncate=False)

# Рассчитайте среднее значение Total Spent для каждой места где прошла оплата. Округлите до двух знаков после запятой.
avg_by_location = (df_clean
    .where(F.col("location").isNotNull())
    .groupBy("location")
    .agg(F.round(F.avg("total_spent"), 2).alias("avg_total_spent"))
    .orderBy(F.desc("avg_total_spent"), F.asc("location"))
)
avg_by_location.show(truncate=False)

# Временные признаки: Добавьте два новых столбца на основе Transaction Date:
# day_of_week: День недели
# transaction_month: Месяц транзакции
df_feat = (df_clean
    .withColumn("day_of_week_num", F.dayofweek("transaction_date"))
    .withColumn("day_of_week", F.date_format("transaction_date", "EEEE"))
    .withColumn("transaction_month", F.date_format("transaction_date", "yyyy-MM"))
)

# Продажи по дням недели: Рассчитайте среднюю сумму продаж (Total Spent) для каждого дня недели.
# Выведите результаты, отсортированные по дням недели.
avg_by_weekday = (df_feat
    .groupBy("day_of_week", "day_of_week_num")
    .agg(F.round(F.avg("total_spent"), 2).alias("avg_total_spent"))
    .orderBy(((F.col("day_of_week_num") + 5) % 7 + 1).asc())  # делаем порядок Monday=1 ... Sunday=7
    .select("day_of_week", "avg_total_spent")
)
avg_by_weekday.show(truncate=False)

# Продажи по месяцам: Рассчитайте среднюю сумму продаж (Total Spent)  для каждого месяца. Выведите результаты, отсортированные по месяцам.
avg_by_month = (df_feat
    .groupBy("transaction_month")
    .agg(F.round(F.avg("total_spent"), 2).alias("avg_total_spent"))
    .orderBy("transaction_month")
)
avg_by_month.show(truncate=False)

# Рассчитайте customer_lifetime_value (CLV) для каждого клиента как общую сумму (Total Spent), потраченную этим клиентом за все транзакции.
# Выведите Топ-10 клиентов по их CLV (customer_id и их CLV).
top10_clv = (df_feat
    .groupBy("customer_id")
    .agg(F.round(F.sum("total_spent"), 2).alias("clv"))
    .orderBy(F.col("clv").desc())
    .limit(10)
)
top10_clv.show(truncate=False)

spark.stop()

+--------------+-----------+-------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|Transaction ID|Customer ID|Category     |Item        |Price Per Unit|Quantity|Total Spent|Payment Method|Location|Transaction Date|Discount Applied|
+--------------+-----------+-------------+------------+--------------+--------+-----------+--------------+--------+----------------+----------------+
|TXN_6867343   |CUST_09    |Patisserie   |Item_10_PAT |18.5          |10.0    |185.0      |Digital Wallet|Online  |2024-04-08      |true            |
|TXN_3731986   |CUST_22    |Milk Products|Item_17_MILK|29.0          |9.0     |261.0      |Digital Wallet|Online  |2023-07-23      |true            |
|TXN_9303719   |CUST_02    |Butchers     |Item_12_BUT |21.5          |2.0     |43.0       |Credit Card   |Online  |2022-10-05      |false           |
|TXN_9458126   |CUST_06    |Beverages    |Item_16_BEV |27.5          |9.0     |247.5      |Credit Ca