### Шаг 1. Создать таблицу, используя csv-файл

In [2]:
!pip install -q pyspark


In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("ColabSpark") \
    .enableHiveSupport() \
    .getOrCreate()

sc = spark.sparkContext

In [10]:
from google.colab import files
uploaded = files.upload()

import pandas as pd
import io

df = spark.read.csv(
    "Hotel.csv",
    header=True,       # если первая строка — заголовки
    inferSchema=True   # попытаться вывести типы
)

df.show(5)
df.printSchema()

Saving Hotel.csv to Hotel (1).csv
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|      ID|n_adults|n_children|weekend_nights|week_nights|   meal_plan|car_parking_space|  room_type|lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|      status|
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|INN00001|       2|         0|             1|          2| Meal Plan 1|                0|Room_Type 1|      224|2017|   10|   2|       Offline|             0|                     0|                   

In [11]:
df.write.saveAsTable("logs_hotel")

In [12]:
spark.sql("SELECT * FROM logs_hotel LIMIT 10").show()

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|      ID|n_adults|n_children|weekend_nights|week_nights|   meal_plan|car_parking_space|  room_type|lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|      status|
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|INN00001|       2|         0|             1|          2| Meal Plan 1|                0|Room_Type 1|      224|2017|   10|   2|       Offline|             0|                     0|                             0|          65.0|       

### Шаг 2. Создать (сгенерировать) таблицу calendar, который будет состоять из одного поля calendar_dt со всеми днями с 2017-01-01 по 2018-12-31.

In [13]:
spark.sql("""
CREATE TABLE IF NOT EXISTS calendar (
  calendar_dt DATE
)
USING parquet
""")

DataFrame[]

In [14]:
spark.sql("""
INSERT INTO calendar
SELECT date as calendar_dt
FROM (
    SELECT explode(sequence(
        to_date('2017-01-01'),
        to_date('2018-12-31'),
        interval 1 day
    )) as date
)
""")

DataFrame[]

In [15]:
spark.sql("SELECT * FROM calendar LIMIT 10").show()

+-----------+
|calendar_dt|
+-----------+
| 2017-01-01|
| 2017-01-02|
| 2017-01-03|
| 2017-01-04|
| 2017-01-05|
| 2017-01-06|
| 2017-01-07|
| 2017-01-08|
| 2017-01-09|
| 2017-01-10|
+-----------+



### Шаг 3. Выполнить следующие запросы:


### Запрос 1. Вычислить среднее количество ночей, которые гости проводят в отеле (только для подтвержденных бронирований, с детализацией по месяцам и годам)

In [18]:
spark.sql("DROP TABLE IF EXISTS calendar_with_counts")

spark.sql("""
CREATE TABLE IF NOT EXISTS calendar_with_counts (
  calendar_dt DATE,
  active_bookings INT,
  monthly_unique_bookings INT,
  year INT,
  month INT
)
USING parquet
""")

spark.sql("SELECT * FROM calendar_with_counts LIMIT 10").show()

+-----------+---------------+-----------------------+----+-----+
|calendar_dt|active_bookings|monthly_unique_bookings|year|month|
+-----------+---------------+-----------------------+----+-----+
+-----------+---------------+-----------------------+----+-----+



In [39]:
query_1 = """
WITH filtered_hotel AS (
    SELECT *
    FROM logs_hotel
    WHERE NOT (year = 2018 AND month = 2 AND date = 29)
        AND status = 'Not_Canceled'
        AND (weekend_nights + week_nights) > 0
),
monthly_stats AS (
    SELECT
        YEAR(c.calendar_dt) as year,
        MONTH(c.calendar_dt) as month,
        COUNT(DISTINCT
            CASE
                WHEN h.ID IS NOT NULL
                THEN CONCAT(c.calendar_dt, '-', h.ID)
            END
        ) as total_daily_active_bookings,
        COUNT(DISTINCT h.ID) as unique_ids_per_month,
        ROUND(try_divide(total_daily_active_bookings, unique_ids_per_month), 2) as mean_nights_per_month
    FROM calendar c
    LEFT JOIN filtered_hotel h
        ON c.calendar_dt >= TO_DATE(
            CONCAT(
                CAST(h.year AS STRING),
                '-',
                LPAD(CAST(h.month AS STRING), 2, '0'),
                '-',
                LPAD(CAST(h.date AS STRING), 2, '0')
            )
        )
        AND c.calendar_dt <= DATE_ADD(
            TO_DATE(
                CONCAT(
                    CAST(h.year AS STRING),
                    '-',
                    LPAD(CAST(h.month AS STRING), 2, '0'),
                    '-',
                    LPAD(CAST(h.date AS STRING), 2, '0')
                )
            ),
            (h.weekend_nights + h.week_nights - 1)
        )
    GROUP BY YEAR(c.calendar_dt), MONTH(c.calendar_dt)
)
SELECT mean_nights_per_month, year, month
FROM monthly_stats
ORDER BY year, month
"""
spark.sql(query_1).show()

+---------------------+----+-----+
|mean_nights_per_month|year|month|
+---------------------+----+-----+
|                 NULL|2017|    1|
|                 NULL|2017|    2|
|                 NULL|2017|    3|
|                 NULL|2017|    4|
|                 NULL|2017|    5|
|                 NULL|2017|    6|
|                 2.57|2017|    7|
|                 2.65|2017|    8|
|                 2.58|2017|    9|
|                  2.6|2017|   10|
|                 2.64|2017|   11|
|                 2.66|2017|   12|
|                  2.5|2018|    1|
|                 2.53|2018|    2|
|                 2.87|2018|    3|
|                 2.73|2018|    4|
|                 2.61|2018|    5|
|                 2.44|2018|    6|
|                 3.03|2018|    7|
|                 2.99|2018|    8|
+---------------------+----+-----+
only showing top 20 rows


### Запрос 2. Определить ТОП-3 месяца по проценту отмененных броней за 2018 год

In [40]:
query2 = """
WITH monthly_stats_2018 AS (
    SELECT
        month,
        COUNT(*) as total_bookings,
        SUM(CASE WHEN status = 'Canceled' THEN 1 ELSE 0 END) as canceled_bookings
    FROM logs_hotel
    WHERE year = 2018
    GROUP BY month
),
monthly_percentages AS (
    SELECT
        month,
        total_bookings,
        canceled_bookings,
        ROUND(
            (canceled_bookings * 100.0) / NULLIF(total_bookings, 0),
            2
        ) as cancel_percentage
    FROM monthly_stats_2018
    WHERE total_bookings > 0
)
SELECT
    month,
    total_bookings,
    canceled_bookings,
    cancel_percentage || '%' as cancel_percentage_formatted
FROM monthly_percentages
ORDER BY cancel_percentage DESC
LIMIT 3
"""
spark.sql(query2).show()

+-----+--------------+-----------------+---------------------------+
|month|total_bookings|canceled_bookings|cancel_percentage_formatted|
+-----+--------------+-----------------+---------------------------+
|    8|          2799|             1303|                     46.55%|
|   10|          3404|             1578|                     46.36%|
|    9|          2962|             1356|                     45.78%|
+-----+--------------+-----------------+---------------------------+



### Запрос 3. Вычислить среднее время на каждый месяц между бронированием и заездом в отель для подтвержденных броней

In [43]:
query3 = """
SELECT
    year,
    month,
    COUNT(*) as confirmed_bookings,
    ROUND(AVG(lead_time), 1) as avg_days_before_checkin
FROM logs_hotel
WHERE status = 'Not_Canceled'
GROUP BY year, month
ORDER BY year, month
"""
spark.sql(query3).show()

+----+-----+------------------+-----------------------+
|year|month|confirmed_bookings|avg_days_before_checkin|
+----+-----+------------------+-----------------------+
|2017|    7|               120|                  130.7|
|2017|    8|               829|                   35.1|
|2017|    9|              1467|                   51.7|
|2017|   10|              1611|                   55.9|
|2017|   11|               620|                   33.3|
|2017|   12|               906|                   46.7|
|2018|    1|               990|                   34.9|
|2018|    2|              1274|                   30.5|
|2018|    3|              1658|                   43.2|
|2018|    4|              1741|                   62.5|
|2018|    5|              1650|                   61.0|
|2018|    6|              1912|                   70.6|
|2018|    7|              1486|                   86.9|
|2018|    8|              1496|                   83.1|
|2018|    9|              1606|                 

### Запрос 4. Вычислить общую среднюю выручку на каждый месяц в каждом году, сгруппировав по всем типам бронирования для подтвержденных броней, и вывести это в виде сводной таблицы (PIVOT)

In [47]:
query4 = """
WITH confirmed_bookings AS (
    SELECT
        year,
        month,
        avg_room_price * (weekend_nights + week_nights) as total_revenue
    FROM logs_hotel
    WHERE status = 'Not_Canceled'
),
monthly_revenue AS (
    SELECT
        year,
        month,
        ROUND(AVG(total_revenue), 2) as avg_monthly_revenue
    FROM confirmed_bookings
    GROUP BY year, month
)
SELECT *
FROM monthly_revenue
PIVOT (
    ROUND(AVG(avg_monthly_revenue), 2) as avg_revenue
    FOR month IN (
        1 as Jan, 2 as Feb, 3 as Mar, 4 as Apr, 5 as May, 6 as Jun,
        7 as Jul, 8 as Aug, 9 as Sep, 10 as Oct, 11 as Nov, 12 as Dec
    )
)
ORDER BY year
"""
spark.sql(query4).show()

+----+------+------+------+-----+------+-----+------+------+------+------+------+------+
|year|   Jan|   Feb|   Mar|  Apr|   May|  Jun|   Jul|   Aug|   Sep|   Oct|   Nov|   Dec|
+----+------+------+------+-----+------+-----+------+------+------+------+------+------+
|2017|  NULL|  NULL|  NULL| NULL|  NULL| NULL|233.96|246.36|275.66|254.18|197.65|235.67|
|2018|208.47|219.83|265.14|291.7|315.17|287.8|355.16|369.65|339.48|315.03|290.17|310.88|
+----+------+------+------+-----+------+-----+------+------+------+------+------+------+



### Запрос 5. Выявить ТОП-5 постоянных гостей, которые принесли наибольшую выручку за все время, и показать их долю в общей выручке от постоянных гостей. Использовать уникальный идентификатор брони как уникальный идентификатор гостя, предположив, что 1 бронь = 1 гость

In [50]:
query5 = """
WITH repeated_guests_revenue AS (
    SELECT
        ID as guest_id,
        SUM(avg_room_price * (weekend_nights + week_nights)) as total_revenue
    FROM logs_hotel
    WHERE repeated_guest = 1
        AND status = 'Not_Canceled'
    GROUP BY ID
),
total_revenue_all AS (
    SELECT
        SUM(total_revenue) as total_revenue_all_repeated_guests
    FROM repeated_guests_revenue
)
SELECT
    ROW_NUMBER() OVER (ORDER BY rg.total_revenue DESC) as rank,
    rg.guest_id,
    ROUND(rg.total_revenue, 2) as total_revenue,
    ROUND(
        rg.total_revenue * 100.0 / tra.total_revenue_all_repeated_guests,
        2
    ) as revenue_share
FROM repeated_guests_revenue rg
CROSS JOIN total_revenue_all tra
ORDER BY rg.total_revenue DESC
LIMIT 5
"""
spark.sql(query5).show()

+----+--------+-------------+-------------+
|rank|guest_id|total_revenue|revenue_share|
+----+--------+-------------+-------------+
|   1|INN19235|       1754.4|         1.55|
|   2|INN05222|        690.0|         0.61|
|   3|INN14189|        665.0|         0.59|
|   4|INN09923|        660.0|         0.58|
|   5|INN25479|        650.0|         0.57|
+----+--------+-------------+-------------+



### Запрос 6. Вывести общее количество гостей на каждый день в отеле, отсортировав по убыванию дат, включая дни, когда отель пустует. Также рассчитать процент загрузки для каждого дня, если известно, что общая вместимость отеля 400 человек

In [54]:
query6 = """
WITH daily_occupancy AS (
    SELECT
        c.calendar_dt as date,
        COALESCE(SUM(h.n_adults + h.n_children), 0) as total_guests
    FROM calendar c
LEFT JOIN (
    SELECT *
    FROM logs_hotel
    WHERE NOT (year = 2018 AND month = 2 AND date = 29)
        AND status = 'Not_Canceled'
) h
ON c.calendar_dt >= TO_DATE(
    CONCAT(h.year, '-',
           LPAD(h.month, 2, '0'), '-',
           LPAD(h.date, 2, '0'))
)
        AND c.calendar_dt <= DATE_ADD(
            TO_DATE(
                CONCAT(h.year, '-',
                      LPAD(h.month, 2, '0'), '-',
                      LPAD(h.date, 2, '0'))
            ),
            (h.weekend_nights + h.week_nights - 1)
        )
        AND h.status = 'Not_Canceled'
    GROUP BY c.calendar_dt
)
SELECT
    date,
    total_guests,
    400 as hotel_capacity,
    400 - total_guests as available_beds,
    ROUND((total_guests * 100.0) / 400, 2) as occupancy_rate_percent
FROM daily_occupancy
ORDER BY date DESC
"""
spark.sql(query6).show()

+----------+------------+--------------+--------------+----------------------+
|      date|total_guests|hotel_capacity|available_beds|occupancy_rate_percent|
+----------+------------+--------------+--------------+----------------------+
|2018-12-31|         562|           400|          -162|                140.50|
|2018-12-30|         572|           400|          -172|                143.00|
|2018-12-29|         542|           400|          -142|                135.50|
|2018-12-28|         507|           400|          -107|                126.75|
|2018-12-27|         552|           400|          -152|                138.00|
|2018-12-26|         422|           400|           -22|                105.50|
|2018-12-25|         397|           400|             3|                 99.25|
|2018-12-24|         373|           400|            27|                 93.25|
|2018-12-23|         341|           400|            59|                 85.25|
|2018-12-22|         282|           400|           1