# Домашнее задание 5. Анализ данных на Spark SQL

**Выполнила:** Смирнова Анастасия

In [1]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:90% !important; }</style>"))

In [2]:
pip install pyspark



In [3]:
pip install findspark



In [4]:
import findspark
findspark.init()

In [5]:
import pyspark
from pyspark.context import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
spark = (
    SparkSession
    .builder
    .appName('Test_01')
    .config('spark.ui.port', '9311')
    .config('spark.executor.memoryOverhead', '1G')
    .config('spark.shuffle.service.enabled', 'true')
    .config('spark.dynamicAllocation.enabled', 'true')
    .config('spark.driver.extraClassPath', '/opt/spark/jars/sqljdbc42.jar')\
    .config('spark.executor.extraClassPath', '/opt/spark/jars/sqljdbc42.jar')\
    .getOrCreate()
)

In [6]:
spark

In [7]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType, DateType, ByteType, DoubleType
from pyspark.sql import Window

# 1. Загрузка данных

In [8]:
# Загружаем датафрейм
hotel = spark.read.option("header",True).option("sep",",").csv("/content/Hotel.csv")

In [9]:
# Выводим первые 10 строк
hotel.show(10)

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|      ID|n_adults|n_children|weekend_nights|week_nights|   meal_plan|car_parking_space|  room_type|lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|      status|
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|INN00001|       2|         0|             1|          2| Meal Plan 1|                0|Room_Type 1|      224|2017|   10|   2|       Offline|             0|                     0|                             0|            65|       

In [10]:
# Выводим схему
hotel.printSchema()

root
 |-- ID: string (nullable = true)
 |-- n_adults: string (nullable = true)
 |-- n_children: string (nullable = true)
 |-- weekend_nights: string (nullable = true)
 |-- week_nights: string (nullable = true)
 |-- meal_plan: string (nullable = true)
 |-- car_parking_space: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- lead_time: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- date: string (nullable = true)
 |-- market_segment: string (nullable = true)
 |-- repeated_guest: string (nullable = true)
 |-- previous_cancellations: string (nullable = true)
 |-- previous_bookings_not_canceled: string (nullable = true)
 |-- avg_room_price: string (nullable = true)
 |-- special_requests: string (nullable = true)
 |-- status: string (nullable = true)



Все данные загрузились в формате `String`. Создадим единый столбец с датой, объединив значения из полей `date`,`month`,`year`. Далее зададим присвоим нужным полям корректные типы данных.

In [11]:
# Создаем единый столбец с датой
hotel = hotel.withColumn("dt", F.make_date(F.col("year"), F.col("month"), F.col("date")))

# Удаляем лишние столбцы
columns_to_drop = ["date", "month", "year"]
hotel = hotel.drop(*columns_to_drop)

In [12]:
hotel = hotel.select(
    F.col("ID").cast("string"),
    F.col("n_adults").cast("int"),
    F.col("n_children").cast("int"),
    F.col("weekend_nights").cast("int"),
    F.col("week_nights").cast("int"),
    F.col("meal_plan").cast("string"),
    F.col("car_parking_space").cast("byte"),
    F.col("room_type").cast("string"),
    F.col("dt").cast("date"),
    F.col("lead_time").cast("long"),
    F.col("market_segment").cast("string"),
    F.col("repeated_guest").cast("byte"),
    F.col("previous_cancellations").cast("int"),
    F.col("previous_bookings_not_canceled").cast("int"),
    F.col("avg_room_price").cast("double"),
    F.col("special_requests").cast("int"),
    F.col("status").cast("string"),
)

In [13]:
# Еще раз выводи схему
hotel.printSchema()

root
 |-- ID: string (nullable = true)
 |-- n_adults: integer (nullable = true)
 |-- n_children: integer (nullable = true)
 |-- weekend_nights: integer (nullable = true)
 |-- week_nights: integer (nullable = true)
 |-- meal_plan: string (nullable = true)
 |-- car_parking_space: byte (nullable = true)
 |-- room_type: string (nullable = true)
 |-- dt: date (nullable = true)
 |-- lead_time: long (nullable = true)
 |-- market_segment: string (nullable = true)
 |-- repeated_guest: byte (nullable = true)
 |-- previous_cancellations: integer (nullable = true)
 |-- previous_bookings_not_canceled: integer (nullable = true)
 |-- avg_room_price: double (nullable = true)
 |-- special_requests: integer (nullable = true)
 |-- status: string (nullable = true)



In [14]:
# Выводим 10 строк датафрейма
hotel.show(10)

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+----------+---------+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|      ID|n_adults|n_children|weekend_nights|week_nights|   meal_plan|car_parking_space|  room_type|        dt|lead_time|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|      status|
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+----------+---------+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|INN00001|       2|         0|             1|          2| Meal Plan 1|                0|Room_Type 1|2017-10-02|      224|       Offline|             0|                     0|                             0|          65.0|               0|Not_Cancel

Преобразования типов данных выполнены успешно.

Теперь создадим `calendar_dt` с диапазонами дат от 2017-01-01 по 2018-12-31.

In [15]:
# Создание calendar_dt
calendar_dt = spark.range(1).select(
    F.explode(
        F.sequence(
            F.to_date(F.lit("2017-01-01")),
            F.to_date(F.lit("2018-12-31")),
            F.expr("interval 1 day")
        )
    ).alias("c_dt")
)

calendar_dt.show(10)

+----------+
|      c_dt|
+----------+
|2017-01-01|
|2017-01-02|
|2017-01-03|
|2017-01-04|
|2017-01-05|
|2017-01-06|
|2017-01-07|
|2017-01-08|
|2017-01-09|
|2017-01-10|
+----------+
only showing top 10 rows



Датафрейм успешно создан.

Дополнительно удалим полные дубликаты, если они есть.

In [16]:
before_count = hotel.count() # Кол-во строк до удаления дубликатов
hotel = hotel.distinct()
after_count = hotel.count() # Кол-во строк после удаления дубликатов

print(f"Колчичество строк до удаления дубликатов: {before_count}, после: {after_count}")

Колчичество строк до удаления дубликатов: 36275, после: 36275


# Задание 1

Вычислить среднее количество ночей, которые гости проводят в отеле (только для подтвержденных бронирований, с детализацией по месяцам и годам)

In [17]:
# Создаем новый столбец с общим количеством ночей
hotel = hotel.withColumn(
    "total_nights",
    F.col("weekend_nights") + F.col("week_nights")
)

# Выполняем агрегацию для подтвержденных бронирований
avg_nights_data = (
    hotel
    # Фильтруем только подтвержденные бронирования
    .filter(F.col("status") == "Not_Canceled")

    # Группируем по месяцам
    .groupBy(
        F.date_trunc("month", F.col("dt")).alias("month")
    )

    # Вычисляем среднее и округляем
    .agg(
        F.round(F.avg("total_nights"), 2).alias("avg_night_price")
    )

    # Упорядочиваем по месяцам
    .orderBy("month")

    # Убираем NULL значения
    .filter(F.col("month").isNotNull())
)

# Отображаем результат
avg_nights_data.show()

+-------------------+---------------+
|              month|avg_night_price|
+-------------------+---------------+
|2017-07-01 00:00:00|           3.02|
|2017-08-01 00:00:00|           2.72|
|2017-09-01 00:00:00|           2.66|
|2017-10-01 00:00:00|            2.7|
|2017-11-01 00:00:00|           2.72|
|2017-12-01 00:00:00|           3.04|
|2018-01-01 00:00:00|           2.74|
|2018-02-01 00:00:00|           2.68|
|2018-03-01 00:00:00|           3.04|
|2018-04-01 00:00:00|           2.92|
|2018-05-01 00:00:00|           2.81|
|2018-06-01 00:00:00|            2.6|
|2018-07-01 00:00:00|           3.19|
|2018-08-01 00:00:00|           3.15|
|2018-09-01 00:00:00|           2.79|
|2018-10-01 00:00:00|           2.89|
|2018-11-01 00:00:00|           2.98|
|2018-12-01 00:00:00|           3.25|
+-------------------+---------------+



# Задание 2

Определить ТОП-3 месяца по проценту отмененных броней за 2018 год.

In [18]:
# Считаем отмененные бронирования за 2018 год
cancellations_count = (
    hotel
    # Фильтруем по отмененным бронированиям и 2018 году
    .filter(
        (F.col("status") == "Canceled") &
        (F.year(F.col("dt")) == 2018)
    )
    # Группируем по месяцам
    .groupBy(
        F.date_trunc("month", F.col("dt")).alias("cancelation_month")
    )
    # Считаем количество отмененных бронирований
    .agg(
        F.count_distinct("ID").alias("cancelations_count")
    )
    # Убираем NULL значения
    .filter(F.col("cancelation_month").isNotNull())
)

# Считаем общее количество бронирований и присоединяем информацию о количестве отмененных
top_3_count = (
    hotel
    # Фильтруем бронирования за 2018 год
    .filter(F.year(F.col("dt")) == 2018)
    # Группируем все бронирования по месяцам
    .groupBy(
        F.date_trunc("month", F.col("dt")).alias("month")
    )
    .agg(
        F.count_distinct("ID").alias("bookings_total")
    )
    # Убираем NULL значения
    .filter(F.col("month").isNotNull())
    # Присоединяем данные об отменах
    .join(
        cancellations_count,
        on=F.col("cancelation_month") == F.col("month"),
        how="inner"
    )
    # Рассчитываем процент отмен
    .withColumn(
        "cancelations_percent",
        F.round(
            F.col("cancelations_count") / F.col("bookings_total") * 100,
            2
        )
    )
    # Сортируем по проценту отмен по убыванию
    .orderBy(F.col("cancelations_percent").desc())

    .limit(3)
)

# Показываем топ-3 месяца в 2018 году с наибольшим процентом отмен
top_3_count.select("cancelation_month", "cancelations_percent").show()

+-------------------+--------------------+
|  cancelation_month|cancelations_percent|
+-------------------+--------------------+
|2018-08-01 00:00:00|               46.55|
|2018-10-01 00:00:00|               46.36|
|2018-09-01 00:00:00|               45.78|
+-------------------+--------------------+



# Задание 3
Вычислить среднее время на каждый месяц между бронированием и заездом в отель для подтвержденных броней.

In [19]:
avg_time = (
   hotel
    # Фильтруем по подтвержденным бронированиям
    .filter(
        (F.col("status") == "Not_Canceled")
    )
   # Группируем по месяцу
   .groupBy(
       F.date_trunc("month", F.col("dt")).alias("month")
   )
   # Находим среднее количество дней
   .agg(
       F.round(F.avg(F.col("lead_time"))).alias("avg_days")
   )
   #Убираем NULL значения
   .filter(F.col("month").isNotNull())

   # Сортируем по месяцу
   .orderBy("month")
)

avg_time.show()

+-------------------+--------+
|              month|avg_days|
+-------------------+--------+
|2017-07-01 00:00:00|   131.0|
|2017-08-01 00:00:00|    35.0|
|2017-09-01 00:00:00|    52.0|
|2017-10-01 00:00:00|    56.0|
|2017-11-01 00:00:00|    33.0|
|2017-12-01 00:00:00|    47.0|
|2018-01-01 00:00:00|    35.0|
|2018-02-01 00:00:00|    30.0|
|2018-03-01 00:00:00|    43.0|
|2018-04-01 00:00:00|    62.0|
|2018-05-01 00:00:00|    61.0|
|2018-06-01 00:00:00|    71.0|
|2018-07-01 00:00:00|    87.0|
|2018-08-01 00:00:00|    83.0|
|2018-09-01 00:00:00|    63.0|
|2018-10-01 00:00:00|    73.0|
|2018-11-01 00:00:00|    44.0|
|2018-12-01 00:00:00|    70.0|
+-------------------+--------+



# Задание 4
Вычислить общую среднюю выручку на каждый месяц в каждом году, сгруппировав по всем типам бронирования для подтвержденных броней, и вывести это в виде сводной таблицы (PIVOT).

In [20]:
# Добавляем столбец с выручкой
hotel = (
    hotel
    .withColumn("revenue", F.col("total_nights") * F.col("avg_room_price"))
)

#Агрегируем данные
avg_revenue = (
    hotel
    # Берем только подтвержденные бронирования
   .filter(
        (F.col("status") == "Not_Canceled")
    )
   # Группируем по месяцу
    .groupBy(
        F.date_trunc("month", "dt").alias("month")
    )
    # Создаем pivot-table по типам бронирования
    .pivot("market_segment")
    # Находим среднюю выручку
    .agg(
        F.round(F.avg("revenue"),2).alias("avg_revenue")
    )
    # Убираем NULL значения
    .filter(
       (F.col("month").isNotNull())
    )
    .fillna(0)
    .orderBy("month")
)

avg_revenue.show()

+-------------------+--------+-------------+---------+-------+------+
|              month|Aviation|Complementary|Corporate|Offline|Online|
+-------------------+--------+-------------+---------+-------+------+
|2017-07-01 00:00:00|     0.0|         22.4|   113.75| 228.95|290.56|
|2017-08-01 00:00:00|     0.0|         0.32|   156.42| 235.54|284.21|
|2017-09-01 00:00:00|     0.0|        16.89|   177.83| 236.65|348.55|
|2017-10-01 00:00:00|     0.0|         1.09|   180.26| 223.24|311.47|
|2017-11-01 00:00:00|     0.0|        14.81|   102.97| 198.36|240.52|
|2017-12-01 00:00:00|     0.0|         0.25|   141.11| 253.86|258.93|
|2018-01-01 00:00:00|     0.0|         2.27|   113.03| 210.51|236.09|
|2018-02-01 00:00:00|   352.0|         1.12|   112.37| 253.51|234.31|
|2018-03-01 00:00:00|  118.33|        38.17|   142.39| 233.39|301.71|
|2018-04-01 00:00:00|  321.81|          0.0|   108.42| 236.44|320.08|
|2018-05-01 00:00:00|   262.5|          0.0|    229.5| 274.55|352.34|
|2018-06-01 00:00:00

# Задание 5

Выявить ТОП-5 постоянных гостей, которые принесли наибольшую выручку за все время, и показать их долю в общей выручке от постоянных гостей. Использовать уникальный идентификатор брони как уникальный идентификатор гостя, предположив, что 1 бронь = 1 гость.

In [21]:
# Задаем условие для оконной функции
window_agg = Window.partitionBy("repeated_guest")

# Агрегируем данные
top_5_guests = (
    hotel
    .withColumn("total_by_guest_type", F.sum("revenue").over(window_agg))\
    .withColumn("guest_revenue_percentage", F.round(F.col("revenue") / F.col("total_by_guest_type") * 100,4))
    .filter(F.col("repeated_guest") == 1)
    .orderBy(F.col("revenue").desc())
    .limit(5)
)

top_5_guests.select("ID","guest_revenue_percentage").show()

+--------+------------------------+
|      ID|guest_revenue_percentage|
+--------+------------------------+
|INN19235|                  1.5124|
|INN05222|                  0.5948|
|INN14189|                  0.5733|
|INN09923|                  0.5689|
|INN25479|                  0.5603|
+--------+------------------------+



# Задание 6
Вывести общее количество гостей на каждый день в отеле, отсортировав по убыванию дат, включая дни, когда отель пустует. Также рассчитать процент загрузки для каждого дня, если известно, что общая вместимость отеля 400 человек.

In [22]:
# Рассчитываем количество гостей по дням
guest_count = (
    hotel
    .filter(F.col("status") == "Not_Canceled")
    .withColumn("guests_total", F.col("n_adults") + F.col("n_children"))
    .groupBy(
        F.date_trunc("day", "dt").alias("dt")
    )
    .agg(
        F.sum("guests_total").alias("guests_total")
    )
)

# Рассчитываем загрузку, включая те дни, когда отель пустует
load_stats = (
    calendar_dt
    .join(
        guest_count,
        on=F.col("c_dt") == F.col("dt"),
        how="left"
        )
    .withColumn(
        "load_percentage",
        F.round(F.col("guests_total") / 400 * 100,2)
        )
    .orderBy(F.col("dt").desc())
)

load_stats.select("c_dt","guests_total","load_percentage").show(10)

+----------+------------+---------------+
|      c_dt|guests_total|load_percentage|
+----------+------------+---------------+
|2018-12-31|          67|          16.75|
|2018-12-30|         166|           41.5|
|2018-12-29|         162|           40.5|
|2018-12-28|         134|           33.5|
|2018-12-27|         263|          65.75|
|2018-12-26|         117|          29.25|
|2018-12-25|          84|           21.0|
|2018-12-24|          98|           24.5|
|2018-12-23|         113|          28.25|
|2018-12-22|          89|          22.25|
+----------+------------+---------------+
only showing top 10 rows

