In [17]:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.types import *

In [25]:
spark_session = SparkSession.builder \
    .appName("HotelData") \
    .getOrCreate()

hotel_schema = StructType([
    StructField("ID", IntegerType(), True),  # Уникальный идентификатор брони
    StructField("n_adults", IntegerType(), True),  # Количество взрослых
    StructField("n_children", IntegerType(), True),  # Количество детей
    StructField("weekend_nights", IntegerType(), True),  # Количество забронированных ночей (суббота–воскресенье)
    StructField("week_nights", IntegerType(), True),  # Количество забронированных ночей (понедельник–пятница)
    StructField("meal_plan", StringType(), True),  # План питания
    StructField("car_parking_space", IntegerType(), True),  # Требуется ли парковка? (0 — нет, 1 — да)
    StructField("room_type", StringType(), True),  # Тип номера
    StructField("lead_time", IntegerType(), True),  # Количество дней между датой бронирования и датой прибытия
    StructField("year", IntegerType(), True),  # Год заселения
    StructField("month", IntegerType(), True),  # Месяц заселения
    StructField("date", IntegerType(), True),  # День заселения
    StructField("market_segment", StringType(), True),  # Тип бронирования (онлайн/оффлайн)
    StructField("repeated_guest", IntegerType(), True),  # Постоянный гость? (0 — нет, 1 — да)
    StructField("previous_cancellations", IntegerType(), True),  # Количество предыдущих отмен
    StructField("previous_bookings_not_canceled", IntegerType(), True),  # Количество не отмененных броней
    StructField("avg_room_price", DoubleType(), True),  # Средняя цена в день бронирования
    StructField("special_requests", IntegerType(), True),  # Количество специальных запросов
    StructField("status", IntegerType(), True)  # Флаг отмены бронирования
])

#Шаг 1

In [43]:
df = spark_session.read.csv("Hotel.csv", header=True, inferSchema=True)

# Показываем данные
df.show(5)
df.printSchema()

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|      ID|n_adults|n_children|weekend_nights|week_nights|   meal_plan|car_parking_space|  room_type|lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|      status|
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|INN00001|       2|         0|             1|          2| Meal Plan 1|                0|Room_Type 1|      224|2017|   10|   2|       Offline|             0|                     0|                             0|          65.0|       

#Шаг 2

In [37]:
from pyspark.sql.functions import explode, sequence, to_date, lit

spark = SparkSession.builder.appName("Calendar Generator").getOrCreate()

# Определяем начальную и конечную даты
start_date = "2017-01-01"
end_date = "2018-12-31"

# Создаем calendar с помощью sequence
calendar_df = spark.sql(f"""SELECT explode(sequence(to_date('{start_date}'), to_date('{end_date}'))) as calendar_dt""")

calendar_df.show(10)
print(f"Всего дней: {calendar_df.count()}")

+-----------+
|calendar_dt|
+-----------+
| 2017-01-01|
| 2017-01-02|
| 2017-01-03|
| 2017-01-04|
| 2017-01-05|
| 2017-01-06|
| 2017-01-07|
| 2017-01-08|
| 2017-01-09|
| 2017-01-10|
+-----------+
only showing top 10 rows
Всего дней: 730


#Шаг 3

Задание 1. Вычислить среднее количество ночей, которые гости проводят в отеле (только для подтвержденных бронирований, с детализацией по месяцам и годам)

In [45]:
from pyspark.sql import functions as f
from pyspark.sql.types import IntegerType

In [46]:
df = df.withColumn("weekend_nights", f.col("weekend_nights").cast(IntegerType())) \
       .withColumn("week_nights", f.col("week_nights").cast(IntegerType())) \
       .withColumn("year", f.col("year").cast(IntegerType())) \
       .withColumn("month", f.col("month").cast(IntegerType())) \
       .withColumn("date", f.col("date").cast(IntegerType()))

In [51]:
# Рассчитаем среднее количество ночей для подтвержденных бронирований
result = df.filter(f.col("status") == "Not_Canceled") \
    .withColumn("total_nights", f.col("weekend_nights") + f.col("week_nights")) \
    .groupBy("year", "month") \
    .agg(
        f.avg("total_nights").alias("avg_nights_per_booking"),

    ) \
    .orderBy("year", "month")

print("Среднее количество ночей по месяцам:")
result.show()

Среднее количество ночей по месяцам:
+----+-----+----------------------+
|year|month|avg_nights_per_booking|
+----+-----+----------------------+
|2017|    7|    3.0166666666666666|
|2017|    8|    2.7189384800965017|
|2017|    9|    2.6550783912747105|
|2017|   10|    2.7032898820608318|
|2017|   11|    2.7241935483870967|
|2017|   12|     3.043046357615894|
|2018|    1|    2.7414141414141415|
|2018|    2|    2.6891679748822606|
|2018|    3|    3.0392038600723765|
|2018|    4|     2.924755887421022|
|2018|    5|    2.8054545454545456|
|2018|    6|     2.596757322175732|
|2018|    7|    3.1938088829071334|
|2018|    8|    3.1544117647058822|
|2018|    9|     2.786425902864259|
|2018|   10|    2.8910186199342824|
|2018|   11|    2.9784511784511785|
|2018|   12|    3.2521891418563924|
+----+-----+----------------------+



Задание 2. Определить ТОП-3 месяца по проценту отмененных броней за 2018 год.

In [52]:
# Фильтруем только 2018 год
df_2018 = df.filter(f.col("year") == 2018)

# Считаем статистику по месяцам
monthly_stats = df_2018 \
    .groupBy("year", "month") \
    .agg(
        f.count("*").alias("total_bookings"),
        f.sum(f.when(f.col("status") == "Canceled", 1).otherwise(0)).alias("canceled_bookings")
    ) \
    .withColumn(
        "cancellation_rate",
        f.round(f.col("canceled_bookings") / f.col("total_bookings") * 100, 2)
    ) \
    .orderBy(f.desc("cancellation_rate"))

# Берем ТОП-3 месяца
top_3_months = monthly_stats.limit(3)

print("ТОП-3 месяца по проценту отмененных броней в 2018 году:")
top_3_months.show(truncate=False)

ТОП-3 месяца по проценту отмененных броней в 2018 году:
+----+-----+--------------+-----------------+-----------------+
|year|month|total_bookings|canceled_bookings|cancellation_rate|
+----+-----+--------------+-----------------+-----------------+
|2018|8    |2799          |1303             |46.55            |
|2018|10   |3404          |1578             |46.36            |
|2018|9    |2962          |1356             |45.78            |
+----+-----+--------------+-----------------+-----------------+



Задание 3. Вычислить среднее время на каждый месяц между бронированием и заездом в отель для подтвержденных броней.

In [57]:
result_simple = df.filter(f.col("status") == "Not_Canceled") \
    .groupBy("year", "month") \
    .agg(
        f.avg("lead_time").alias("avg_lead_time_days"),
        f.count("*").alias("confirmed_bookings_count"),
        f.min("lead_time").alias("min_lead_time"),
        f.max("lead_time").alias("max_lead_time")
    ) \
    .orderBy("year", "month") \
    .withColumn("avg_lead_time_days", f.round(f.col("avg_lead_time_days"), 1))

print("Средний lead_time (время между бронированием и заездом) по месяцам:")
result_simple.select(
    f.concat(f.col("year"), f.lit("-"), f.lpad(f.col("month"), 2, "0")).alias("period"),
    f.col("avg_lead_time_days").alias("avg_days_before_checkin"),

).show(truncate=False)

Средний lead_time (время между бронированием и заездом) по месяцам:
+-------+-----------------------+
|period |avg_days_before_checkin|
+-------+-----------------------+
|2017-07|130.7                  |
|2017-08|35.1                   |
|2017-09|51.7                   |
|2017-10|55.9                   |
|2017-11|33.3                   |
|2017-12|46.7                   |
|2018-01|34.9                   |
|2018-02|30.5                   |
|2018-03|43.2                   |
|2018-04|62.5                   |
|2018-05|61.0                   |
|2018-06|70.6                   |
|2018-07|86.9                   |
|2018-08|83.1                   |
|2018-09|63.3                   |
|2018-10|73.2                   |
|2018-11|44.3                   |
|2018-12|69.8                   |
+-------+-----------------------+



Задание 4. Вычислить общую среднюю выручку на каждый месяц в каждом году, сгруппировав по всем типам бронирования для подтвержденных броней, и вывести это в виде сводной таблицы (PIVOT).

In [58]:
# Фильтруем только подтвержденные бронирования
confirmed_df = df.filter(f.col("status") == "Not_Canceled")

# Вычисляем общую выручку за одно бронирование
confirmed_with_revenue = confirmed_df.withColumn(
    "total_revenue",
    f.col("avg_room_price") * (f.col("weekend_nights") + f.col("week_nights"))
)

# Создаем колонку период (год-месяц) для группировки
confirmed_with_revenue = confirmed_with_revenue.withColumn(
    "period",
    f.concat(f.col("year"), f.lit("-"), f.lpad(f.col("month"), 2, "0"))
)

# Группируем по периоду и считаем среднюю выручку
monthly_revenue = confirmed_with_revenue.groupBy("period") \
    .agg(
        f.avg("total_revenue").alias("avg_monthly_revenue"),
        f.count("*").alias("bookings_count"),
        f.sum("total_revenue").alias("total_revenue_sum")
    ) \
    .withColumn("avg_monthly_revenue", f.round(f.col("avg_monthly_revenue"), 2)) \
    .withColumn("total_revenue_sum", f.round(f.col("total_revenue_sum"), 2)) \
    .orderBy("period")

print("Средняя месячная выручка по подтвержденным броням:")
monthly_revenue.show(truncate=False)

# Создаем сводную таблицу (PIVOT) по годам и месяцам
# Сначала добавляем колонки для pivot
pivot_data = confirmed_with_revenue \
    .withColumn("month_name",
                f.when(f.col("month") == 1, "Jan")
                 .when(f.col("month") == 2, "Feb")
                 .when(f.col("month") == 3, "Mar")
                 .when(f.col("month") == 4, "Apr")
                 .when(f.col("month") == 5, "May")
                 .when(f.col("month") == 6, "Jun")
                 .when(f.col("month") == 7, "Jul")
                 .when(f.col("month") == 8, "Aug")
                 .when(f.col("month") == 9, "Sep")
                 .when(f.col("month") == 10, "Oct")
                 .when(f.col("month") == 11, "Nov")
                 .when(f.col("month") == 12, "Dec"))

# Создаем pivot таблицу - средняя выручка по месяцам для каждого года
pivot_table = pivot_data.groupBy("year") \
    .pivot("month_name") \
    .agg(f.round(f.avg("total_revenue"), 2).alias("avg_revenue")) \
    .orderBy("year")

print("\nСводная таблица - средняя выручка по месяцам (по годам):")
pivot_table.show(truncate=False)

Средняя месячная выручка по подтвержденным броням:
+-------+-------------------+--------------+-----------------+
|period |avg_monthly_revenue|bookings_count|total_revenue_sum|
+-------+-------------------+--------------+-----------------+
|2017-07|233.96             |120           |28074.88         |
|2017-08|246.36             |829           |204236.53        |
|2017-09|275.66             |1467          |404397.08        |
|2017-10|254.18             |1611          |409479.44        |
|2017-11|197.65             |620           |122542.08        |
|2017-12|235.67             |906           |213519.27        |
|2018-01|208.47             |990           |206380.39        |
|2018-02|219.83             |1274          |280068.18        |
|2018-03|265.14             |1658          |439599.3         |
|2018-04|291.7              |1741          |507850.02        |
|2018-05|315.17             |1650          |520029.64        |
|2018-06|287.8              |1912          |550271.85        |
|201

Задание 5. Выявить ТОП-5 постоянных гостей, которые принесли наибольшую выручку за все время, и показать их долю в общей выручке от постоянных гостей. Использовать уникальный идентификатор брони как уникальный идентификатор гостя, предположив, что 1 бронь = 1 гость.

In [59]:
# 1. Вычисляем выручку для каждого бронирования
df_with_revenue = df.withColumn(
    "booking_revenue",
    f.col("avg_room_price") * (f.col("weekend_nights") + f.col("week_nights"))
)

# 2. Считаем выручку по каждому гостю (ID бронирования как ID гостя)
guest_revenue = df_with_revenue.groupBy("ID") \
    .agg(
        f.sum("booking_revenue").alias("total_revenue_per_guest"),
        f.count("*").alias("total_bookings_per_guest"),
        f.max("repeated_guest").alias("is_repeated_guest")
    )

# 3. Фильтруем только постоянных гостей (repeated_guest = 1)
repeated_guests = guest_revenue.filter(f.col("is_repeated_guest") == 1)

# 4. Считаем общую выручку от всех постоянных гостей
total_revenue_repeated = repeated_guests.agg(
    f.sum("total_revenue_per_guest").alias("total_revenue_all_repeated_guests")
).collect()[0]["total_revenue_all_repeated_guests"]

print(f"Общая выручка от всех постоянных гостей: {total_revenue_repeated:.2f}")

# 5. Определяем ТОП-5 постоянных гостей по выручке
top_5_repeated_guests = repeated_guests \
    .withColumn(
        "revenue_share_percent",
        f.round(f.col("total_revenue_per_guest") / total_revenue_repeated * 100, 2)
    ) \
    .orderBy(f.desc("total_revenue_per_guest")) \
    .limit(5)

print("\nТОП-5 постоянных гостей по выручке:")
top_5_repeated_guests.select(
    "ID",
    f.round("total_revenue_per_guest", 2).alias("общая_выручка"),
    "total_bookings_per_guest",
    f.concat(f.col("revenue_share_percent"), f.lit("%")).alias("доля_в_выручке_от_постоянных")
).show(truncate=False)

# 6. Считаем долю ТОП-5 в общей выручке от постоянных гостей
top_5_total = top_5_repeated_guests.agg(
    f.sum("total_revenue_per_guest").alias("top_5_total_revenue")
).collect()[0]["top_5_total_revenue"]

top_5_share = (top_5_total / total_revenue_repeated) * 100
print(f"\nИтого: ТОП-5 постоянных гостей приносят {top_5_share:.1f}% от общей выручки всех постоянных гостей")
print(f"Суммарная выручка ТОП-5: {top_5_total:.2f}")
print(f"Суммарная выручка всех постоянных гостей: {total_revenue_repeated:.2f}")

Общая выручка от всех постоянных гостей: 116004.48

ТОП-5 постоянных гостей по выручке:
+--------+-------------+------------------------+----------------------------+
|ID      |общая_выручка|total_bookings_per_guest|доля_в_выручке_от_постоянных|
+--------+-------------+------------------------+----------------------------+
|INN19235|1754.4       |1                       |1.51%                       |
|INN05222|690.0        |1                       |0.59%                       |
|INN14189|665.0        |1                       |0.57%                       |
|INN09923|660.0        |1                       |0.57%                       |
|INN25479|650.0        |1                       |0.56%                       |
+--------+-------------+------------------------+----------------------------+


Итого: ТОП-5 постоянных гостей приносят 3.8% от общей выручки всех постоянных гостей
Суммарная выручка ТОП-5: 4419.40
Суммарная выручка всех постоянных гостей: 116004.48


Задание 5. Вывести общее количество гостей на каждый день в отеле, отсортировав по убыванию дат, включая дни, когда отель пустует. Также рассчитать процент загрузки для каждого дня, если известно, что общая вместимость отеля 400 человек.