# Шаг 1. Подготовка БД

In [1]:
# Инициализация Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pyspark.sql.functions as F

spark = SparkSession.builder \
    .appName("Hotel Data Analysis") \
    .getOrCreate()

# Загрузка данных
df = spark.read.csv("Hotel.csv", header=True, inferSchema=True)
df.createOrReplaceTempView("logs_hotel")

In [23]:
from pyspark.sql.functions import col, expr, sum as spark_sum, round, coalesce, when

In [27]:
print("Схема данных:")
df.printSchema()

Схема данных:
root
 |-- ID: string (nullable = true)
 |-- n_adults: integer (nullable = true)
 |-- n_children: integer (nullable = true)
 |-- weekend_nights: integer (nullable = true)
 |-- week_nights: integer (nullable = true)
 |-- meal_plan: string (nullable = true)
 |-- car_parking_space: integer (nullable = true)
 |-- room_type: string (nullable = true)
 |-- lead_time: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- date: integer (nullable = true)
 |-- market_segment: string (nullable = true)
 |-- repeated_guest: integer (nullable = true)
 |-- previous_cancellations: integer (nullable = true)
 |-- previous_bookings_not_canceled: integer (nullable = true)
 |-- avg_room_price: double (nullable = true)
 |-- special_requests: integer (nullable = true)
 |-- status: string (nullable = true)



In [4]:
print(f"Всего строк: {df.count()}")

Всего строк: 36275


In [33]:
df.show(5)

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|      ID|n_adults|n_children|weekend_nights|week_nights|   meal_plan|car_parking_space|  room_type|lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|      status|
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|INN00001|       2|         0|             1|          2| Meal Plan 1|                0|Room_Type 1|      224|2017|   10|   2|       Offline|             0|                     0|                             0|          65.0|       

# Шаг 2. Создание таблицы-календаря

In [30]:
calendar_df = spark.range(0, 730, 1) \
    .withColumn("id_int", col("id").cast("int")) \
    .withColumn(
        "calendar_dt",
        expr("date_add('2017-01-01', id_int)")
    ) \
    .select("calendar_dt")

calendar_df.createOrReplaceTempView("calendar")

In [31]:
calendar_df.show(5)

+-----------+
|calendar_dt|
+-----------+
| 2017-01-01|
| 2017-01-02|
| 2017-01-03|
| 2017-01-04|
| 2017-01-05|
+-----------+
only showing top 5 rows


# Запросы на оценивание.


### Запрос 1. Вычислить среднее количество ночей, которые гости проводят в отеле (только для подтвержденных бронирований, с детализацией по месяцам и годам)

In [10]:
query1 = spark.sql("""
    SELECT
        year,
        month,
        ROUND(
            AVG(weekend_nights + week_nights),
            2
        ) as avg_nights,
        COUNT(*) as confirmed_bookings
    FROM logs_hotel
    WHERE status = 'Not_Canceled'  -- Только подтвержденные брони
    GROUP BY year, month
    ORDER BY year, month
""")

query1.show()

+----+-----+----------+------------------+
|year|month|avg_nights|confirmed_bookings|
+----+-----+----------+------------------+
|2017|    7|      3.02|               120|
|2017|    8|      2.72|               829|
|2017|    9|      2.66|              1467|
|2017|   10|       2.7|              1611|
|2017|   11|      2.72|               620|
|2017|   12|      3.04|               906|
|2018|    1|      2.74|               990|
|2018|    2|      2.69|              1274|
|2018|    3|      3.04|              1658|
|2018|    4|      2.92|              1741|
|2018|    5|      2.81|              1650|
|2018|    6|       2.6|              1912|
|2018|    7|      3.19|              1486|
|2018|    8|      3.15|              1496|
|2018|    9|      2.79|              1606|
|2018|   10|      2.89|              1826|
|2018|   11|      2.98|              1485|
|2018|   12|      3.25|              1713|
+----+-----+----------+------------------+




### Запрос 2. Определить ТОП-3 месяца по проценту отмененных броней за 2018 год

In [11]:
query2 = spark.sql("""
    WITH monthly_cancellations AS (
        SELECT
            month,
            COUNT(*) as total_bookings,
            SUM(
                CASE
                    WHEN status = 'Canceled' THEN 1
                    ELSE 0
                END
            ) as canceled_bookings
        FROM logs_hotel
        WHERE year = 2018  -- Фильтр по 2018 году
        GROUP BY month
    )
    SELECT
        month,
        total_bookings,
        canceled_bookings,
        ROUND(
            (canceled_bookings * 100.0 / total_bookings),
            2
        ) as cancellation_rate_percent
    FROM monthly_cancellations
    ORDER BY cancellation_rate_percent DESC
    LIMIT 3  -- TOП-3 месяца
""")

query2.show()

+-----+--------------+-----------------+-------------------------+
|month|total_bookings|canceled_bookings|cancellation_rate_percent|
+-----+--------------+-----------------+-------------------------+
|    8|          2799|             1303|                    46.55|
|   10|          3404|             1578|                    46.36|
|    9|          2962|             1356|                    45.78|
+-----+--------------+-----------------+-------------------------+




### Запрос 3. Вычислить среднее время на каждый месяц между бронированием и заездом в отель для подтвержденных броней

In [12]:
query3 = spark.sql("""
    SELECT
        year,
        month,
        ROUND(AVG(lead_time), 2) as avg_lead_time_days,
        COUNT(*) as bookings_count
    FROM logs_hotel
    WHERE status = 'Not_Canceled'  -- Только подтвержденные брони
    GROUP BY year, month
    ORDER BY year, month
""")

query3.show(24)

+----+-----+------------------+--------------+
|year|month|avg_lead_time_days|bookings_count|
+----+-----+------------------+--------------+
|2017|    7|            130.73|           120|
|2017|    8|             35.08|           829|
|2017|    9|             51.72|          1467|
|2017|   10|             55.89|          1611|
|2017|   11|             33.28|           620|
|2017|   12|             46.75|           906|
|2018|    1|             34.87|           990|
|2018|    2|             30.53|          1274|
|2018|    3|             43.19|          1658|
|2018|    4|             62.49|          1741|
|2018|    5|             60.99|          1650|
|2018|    6|             70.64|          1912|
|2018|    7|             86.88|          1486|
|2018|    8|             83.09|          1496|
|2018|    9|             63.32|          1606|
|2018|   10|             73.24|          1826|
|2018|   11|             44.25|          1485|
|2018|   12|             69.75|          1713|
+----+-----+-


### Запрос 4. Вычислить общую среднюю выручку на каждый месяц в каждом году, сгруппировав по всем типам бронирования для подтвержденных броней, и вывести это в виде сводной таблицы (PIVOT)

In [15]:
market_segments = spark.sql("""
    SELECT DISTINCT market_segment
    FROM logs_hotel
    WHERE market_segment IS NOT NULL
""").collect()

# Формируем список для PIVOT
segments_list = [row['market_segment'] for row in market_segments]
segments_str = ", ".join([f"'{s}'" for s in segments_list])

# PIVOT запрос
query4_sql = f"""
    SELECT *
    FROM (
        SELECT
            year,
            month,
            market_segment,
            ROUND(
                AVG(avg_room_price * (weekend_nights + week_nights)),
                2
            ) as avg_revenue
        FROM logs_hotel
        WHERE status = 'Not_Canceled'  -- Только подтвержденные брони
        GROUP BY year, month, market_segment
    ) source_table
    PIVOT (
        ROUND(AVG(avg_revenue), 2)
        FOR market_segment IN ({segments_str})
    )
    ORDER BY year, month
"""

query4 = spark.sql(query4_sql)

print("PIVOT таблица средней выручки по типам бронирования:")
query4.show()

PIVOT таблица средней выручки по типам бронирования:
+----+-----+-------------+--------+---------+------+-------+
|year|month|Complementary|Aviation|Corporate|Online|Offline|
+----+-----+-------------+--------+---------+------+-------+
|2017|    7|         22.4|    NULL|   113.75|290.56| 228.95|
|2017|    8|         0.32|    NULL|   156.42|284.21| 235.54|
|2017|    9|        16.89|    NULL|   177.83|348.55| 236.65|
|2017|   10|         1.09|    NULL|   180.26|311.47| 223.24|
|2017|   11|        14.81|    NULL|   102.97|240.52| 198.36|
|2017|   12|         0.25|    NULL|   141.11|258.93| 253.86|
|2018|    1|         2.27|    NULL|   113.03|236.09| 210.51|
|2018|    2|         1.39|   352.0|   115.06|238.07| 251.85|
|2018|    3|        38.17|  118.33|   142.39|301.71| 233.39|
|2018|    4|          0.0|  321.81|   108.42|320.08| 236.44|
|2018|    5|          0.0|   262.5|    229.5|352.34| 274.55|
|2018|    6|          0.0|   247.0|   148.13|335.03| 251.98|
|2018|    7|         5.38|    79


### Запрос 5. Выявить ТОП-5 постоянных гостей, которые принесли наибольшую выручку за все время, и показать их долю в общей выручке от постоянных гостей. Использовать уникальный идентификатор брони как уникальный идентификатор гостя, предположив, что 1 бронь = 1 гость

In [18]:
query5 = spark.sql("""
    WITH guest_revenue AS (
        SELECT
            ID as guest_id,
            COUNT(*) as bookings_count,
            ROUND(
                SUM(
                    avg_room_price * (weekend_nights + week_nights)
                ),
                2
            ) as total_revenue
        FROM logs_hotel
        WHERE repeated_guest = 1  -- Только постоянные гости
          AND status = 'Not_Canceled'  -- Только подтвержденные брони
        GROUP BY ID
    ),
    total_revenue_all AS (
        -- Общая выручка от всех постоянных гостей
        SELECT
            SUM(total_revenue) as overall_revenue
        FROM guest_revenue
    ),
    ranked_guests AS (
        -- Ранжируем гостей по выручке
        SELECT
            gr.guest_id,
            gr.bookings_count,
            gr.total_revenue,
            ROW_NUMBER() OVER (ORDER BY gr.total_revenue DESC) as revenue_rank
        FROM guest_revenue gr
    )
    SELECT
        rg.guest_id,
        rg.bookings_count,
        rg.total_revenue,
        ROUND(
            (rg.total_revenue * 100.0 / tra.overall_revenue),
            4
        ) as revenue_share_percent,
        rg.revenue_rank
    FROM ranked_guests rg
    CROSS JOIN total_revenue_all tra
    WHERE rg.revenue_rank <= 5  -- TOP-5 гостей
    ORDER BY rg.revenue_rank
""")

print("TOP-5 постоянных гостей по выручке:")
query5.show()

TOP-5 постоянных гостей по выручке:
+--------+--------------+-------------+---------------------+------------+
|guest_id|bookings_count|total_revenue|revenue_share_percent|revenue_rank|
+--------+--------------+-------------+---------------------+------------+
|INN19235|             1|       1754.4|                 1.55|           1|
|INN05222|             1|        690.0|               0.6096|           2|
|INN14189|             1|        665.0|               0.5875|           3|
|INN09923|             1|        660.0|               0.5831|           4|
|INN25479|             1|        650.0|               0.5743|           5|
+--------+--------------+-------------+---------------------+------------+




### Запрос 6. Вывести общее количество гостей на каждый день в отеле, отсортировав по убыванию дат, включая дни, когда отель пустует. Также рассчитать процент загрузки для каждого дня, если известно, что общая вместимость отеля 400 человек

In [41]:
query6 = spark.sql("""
    WITH expanded_stays AS (
        SELECT
            date_add(
                to_date(concat(year, '-', lpad(month, 2, '0'), '-', lpad(date, 2, '0'))),
                day_num
            ) as stay_date,
            (n_adults + n_children) as guests
        FROM logs_hotel
        LATERAL VIEW explode(sequence(0, weekend_nights + week_nights - 1)) t AS day_num
        WHERE status = 'Not_Canceled'
          AND NOT (year = 2018 AND month = 2 AND date = 29)  -- Исключаем некорректную дату
    ),
    daily_totals AS (
        -- Суммируем гостей по дням
        SELECT stay_date, SUM(guests) as total_guests
        FROM expanded_stays
        WHERE stay_date IS NOT NULL
        GROUP BY stay_date
    )
    SELECT
        c.calendar_dt as date,
        COALESCE(d.total_guests, 0) as total_guests,
        ROUND(COALESCE(d.total_guests, 0) * 100.0 / 400, 2) as occupancy_rate_percent
    FROM calendar c
    LEFT JOIN daily_totals d ON c.calendar_dt = d.stay_date
    ORDER BY c.calendar_dt DESC
""")

# 3. Показываем результат
print("Первые 20 дней (отсортировано по убыванию даты):")
query6.show()

Первые 20 дней (отсортировано по убыванию даты):
+----------+------------+----------------------+
|      date|total_guests|occupancy_rate_percent|
+----------+------------+----------------------+
|2018-12-31|         562|                140.50|
|2018-12-30|         572|                143.00|
|2018-12-29|         542|                135.50|
|2018-12-28|         507|                126.75|
|2018-12-27|         552|                138.00|
|2018-12-26|         422|                105.50|
|2018-12-25|         397|                 99.25|
|2018-12-24|         373|                 93.25|
|2018-12-23|         341|                 85.25|
|2018-12-22|         282|                 70.50|
|2018-12-21|         247|                 61.75|
|2018-12-20|         240|                 60.00|
|2018-12-19|         228|                 57.00|
|2018-12-18|         258|                 64.50|
|2018-12-17|         274|                 68.50|
|2018-12-16|         254|                 63.50|
|2018-12-15|        