In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, avg, countDistinct, year, month
from pyspark.sql.functions import to_date, coalesce, col
from pyspark.sql.functions import when, col, to_date, year, month

In [2]:
# 1. Создаем SparkSession
spark = SparkSession.builder \
    .appName("PizzaSalesEDA") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/11 21:08:50 WARN Utils: Your hostname, prodata, resolves to a loopback address: 127.0.1.1; using 192.168.31.102 instead (on interface enp4s0)
25/08/11 21:08:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/11 21:08:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# 2. Читаем CSV-файл, имитируя его разбивку на несколько частей
# (в реальном случае - данные уже будут в партициях, например, в HDFS или S3)
df = spark.read.option("header", True).option("inferSchema", True) \
    .csv("data/pizza_sales.csv")

# Пробуем несколько форматов безопасно
df_with_date = df.withColumn(
    "datafield_iso",
    when(
        col("order_date").contains("/"),  # если есть /
        to_date(col("order_date"), "d/M/yyyy")
    ).when(
        col("order_date").contains("-"),  # если есть -
        to_date(col("order_date"), "dd-MM-yyyy")
    )
)

                                                                                

In [4]:
# 3. EDA — быстрая разведка данных
print("===== Схема данных =====")
df_with_date.printSchema()

===== Схема данных =====
root
 |-- pizza_id: double (nullable = true)
 |-- order_id: double (nullable = true)
 |-- pizza_name_id: string (nullable = true)
 |-- quantity: double (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_time: timestamp (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- total_price: double (nullable = true)
 |-- pizza_size: string (nullable = true)
 |-- pizza_category: string (nullable = true)
 |-- pizza_ingredients: string (nullable = true)
 |-- pizza_name: string (nullable = true)
 |-- datafield_iso: date (nullable = true)



In [5]:
print("===== Первые 5 строк =====")
df_with_date.show(5, truncate=False)

===== Первые 5 строк =====
+--------+--------+-------------+--------+----------+-------------------+----------+-----------+----------+--------------+--------------------------------------------------------------------------------------------+-------------------------+-------------+
|pizza_id|order_id|pizza_name_id|quantity|order_date|order_time         |unit_price|total_price|pizza_size|pizza_category|pizza_ingredients                                                                           |pizza_name               |datafield_iso|
+--------+--------+-------------+--------+----------+-------------------+----------+-----------+----------+--------------+--------------------------------------------------------------------------------------------+-------------------------+-------------+
|1.0     |1.0     |hawaiian_m   |1.0     |1/1/2015  |2025-08-11 11:38:36|13.25     |13.25      |M         |Classic       |Sliced Ham, Pineapple, Mozzarella Cheese                                           

In [6]:
# Количество записей и уникальных клиентов
total_orders = df_with_date.select(countDistinct("order_id")).collect()[0][0]
unique_pizza_name_id = df_with_date.select(countDistinct("pizza_name_id")).collect()[0][0]
print(f"Всего заказов: {total_orders}")
print(f"Уникальное число заказанных пицц: {unique_pizza_name_id}")

                                                                                

Всего заказов: 21350
Уникальное число заказанных пицц: 91


In [7]:
# 4. Числовая статистика по сумме заказов
df_with_date.describe(["total_price", "quantity"]).show()

+-------+-----------------+------------------+
|summary|      total_price|          quantity|
+-------+-----------------+------------------+
|  count|            48620|             48620|
|   mean|16.82147367338569|1.0196215549156726|
| stddev|4.437397581181093|0.1430770093247218|
|    min|             9.75|               1.0|
|    max|             83.0|               4.0|
+-------+-----------------+------------------+



In [8]:
# 5. Топ-5 самых популярных пицц
top_pizzas = (
    df_with_date.groupBy("pizza_name")
      .agg(_sum("quantity").alias("total_sold"))
      .orderBy(col("total_sold").desc())
      .limit(5)
)
print("===== Топ-5 популярных пицц =====")
top_pizzas.show()

===== Топ-5 популярных пицц =====
+--------------------+----------+
|          pizza_name|total_sold|
+--------------------+----------+
|The Classic Delux...|    2453.0|
|The Barbecue Chic...|    2432.0|
|  The Hawaiian Pizza|    2422.0|
| The Pepperoni Pizza|    2418.0|
|The Thai Chicken ...|    2371.0|
+--------------------+----------+



In [9]:
# 6. Анализ по месяцам и категориям пицц (агрегация + фильтрация)
# Предположим, у нас есть колонка order_date

df_year_month = df_with_date.withColumn("year", year(col("datafield_iso"))) \
                           .withColumn("month", month(col("datafield_iso")))

monthly_stats = (
    df_year_month.groupBy("year", "month", "pizza_category")
                .agg(
                    _sum("total_price").alias("revenue"),
                    avg("total_price").alias("avg_order_value"),
                    _sum("quantity").alias("total_pizzas")
                )
                .filter(col("revenue") > 5000)
                .orderBy("year", "month", col("revenue").desc())
)

print("===== Месячная статистика по категориям =====")
monthly_stats.show(20, truncate=False)

===== Месячная статистика по категориям =====


[Stage 21:>                                                         (0 + 2) / 2]

+----+-----+--------------+------------------+------------------+------------+
|year|month|pizza_category|revenue           |avg_order_value   |total_pizzas|
+----+-----+--------------+------------------+------------------+------------+
|2015|1    |Classic       |18619.4           |15.039903069466883|1257.0      |
|2015|1    |Supreme       |17929.749999999996|17.475389863547754|1044.0      |
|2015|1    |Veggie        |17055.400000000027|17.08957915831666 |1018.0      |
|2015|1    |Chicken       |16188.75          |18.108221476510067|913.0       |
|2015|2    |Classic       |17336.100000000002|14.970725388601037|1178.0      |
|2015|2    |Supreme       |16718.84999999999 |17.63591772151898 |964.0       |
|2015|2    |Veggie        |15636.400000000025|16.867745415318257|944.0       |
|2015|2    |Chicken       |15468.25          |18.007275902211873|875.0       |
|2015|3    |Classic       |18116.600000000002|14.960033030553264|1236.0      |
|2015|3    |Chicken       |17625.5           |17.966

                                                                                

In [10]:
# 7. Пример сохранения результата в Parquet
monthly_stats.write.mode("overwrite").parquet("output/monthly_stats")


                                                                                

In [11]:
# 8. Завершаем работу
spark.stop()