# Домашнее задание 5. Анализ данных на Spark SQL

### Цель домашней работы

Научиться работать с основными операторами Spark SQL, фильтровать таблицы по разным условиям, писать вложенные запросы, объединять таблицы.

### Условия выполнения задания

Дан csv-файл с логами отеля.

## Как выполнять задание

**Шаг 1. Создать таблицу, используя csv-файл.**   

**Шаг 2. Создать (сгенерировать) таблицу calendar, который будет состоять из одного поля calendar_dt со всеми днями с 2017-01-01 по 2018-12-31.**   

**Шаг 3. Выполнить следующие запросы:**

- Вычислить среднее количество ночей, которые гости проводят в отеле (только для подтвержденных бронирований, с детализацией по месяцам и годам)
- Определить ТОП-3 месяца по проценту отмененных броней за 2018 год.
- Вычислить среднее время на каждый месяц между бронированием и заездом в отель для подтвержденных броней.
- Вычислить общую среднюю выручку на каждый месяц в каждом году, сгруппировав по всем типам бронирования для подтвержденных броней, и вывести это в виде сводной таблицы (PIVOT).
- Выявить ТОП-5 постоянных гостей, которые принесли наибольшую выручку за все время, и показать их долю в общей выручке от постоянных гостей. Использовать уникальный идентификатор брони как уникальный идентификатор гостя, предположив, что 1 бронь = 1 гость.
- Вывести общее количество гостей на каждый день в отеле, отсортировав по убыванию дат, включая дни, когда отель пустует. Также рассчитать процент загрузки для каждого дня, если известно, что общая вместимость отеля 400 человек.


In [1]:
#Начать выполнение задания тут
from pprint import pprint
from pyspark.sql import SparkSession
from pyspark.sql.functions import * 
from pyspark.sql.types import *  

In [2]:
# Так как я поставил Спарк на другую машину в локальной сети в контейнер,
# пришлось повозиться и руками перекинуть туда файл. Но всё запустилось.
spark = SparkSession.builder.remote("sc://192.168.1.55:15002").getOrCreate()
df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv("file:///HW/Hotel.csv")
)
df.show(5)


+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|      ID|n_adults|n_children|weekend_nights|week_nights|   meal_plan|car_parking_space|  room_type|lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|      status|
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|INN00001|       2|         0|             1|          2| Meal Plan 1|                0|Room_Type 1|      224|2017|   10|   2|       Offline|             0|                     0|                             0|          65.0|       

In [3]:
df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- n_adults: integer (nullable = true)
 |-- n_children: integer (nullable = true)
 |-- weekend_nights: integer (nullable = true)
 |-- week_nights: integer (nullable = true)
 |-- meal_plan: string (nullable = true)
 |-- car_parking_space: integer (nullable = true)
 |-- room_type: string (nullable = true)
 |-- lead_time: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- date: integer (nullable = true)
 |-- market_segment: string (nullable = true)
 |-- repeated_guest: integer (nullable = true)
 |-- previous_cancellations: integer (nullable = true)
 |-- previous_bookings_not_canceled: integer (nullable = true)
 |-- avg_room_price: double (nullable = true)
 |-- special_requests: integer (nullable = true)
 |-- status: string (nullable = true)



In [4]:
temp_df = spark.range(1).select(
    explode(
        sequence(
            to_date(lit("2017-01-01")), 
            to_date(lit("2018-12-31")), 
            expr("interval 1 day")
        )
    ).alias("real_date") # Временная колонка с "настоящей" датой
)

# 2. Превращаем дату в строку нужного формата 'yyyy-dd-MM'
calendar_df = temp_df.select(
    date_format("real_date", "yyyy-dd-MM").alias("calendar_dt")
)

calendar_df.show(3)
calendar_df.tail(3)


+-----------+
|calendar_dt|
+-----------+
| 2017-01-01|
| 2017-02-01|
| 2017-03-01|
+-----------+
only showing top 3 rows



[Row(calendar_dt='2018-29-12'),
 Row(calendar_dt='2018-30-12'),
 Row(calendar_dt='2018-31-12')]

## Формат сдачи

**Как отправить задание на проверку.** Загрузите файл в GitHub, в форму приложите ссылку на него. Назовите файл своим ФИО.

**Что нужно отправить:** ссылку на репозиторий, в котором будет ноутбук Jupyter с решением.

## Место, где нужно выполнить задание



###  Выполнить следующие запросы:

Цель этого упражнения — закрепить навык выполнения запросов на выборку. Все запросы выполняются на базе `sample_mflix` с данными о кинофильмах.

#### Задача 1 (1 балл)

Вычислить среднее количество ночей, которые гости проводят в отеле (только для подтвержденных бронирований, с детализацией по месяцам и годам)

In [5]:
# Ваш код здесь
df_clean = df.withColumn("full_date", make_date(col("year"), col("month"), col("date")))
df_clean = df_clean.withColumn(
    "total_nights", col("weekend_nights") + col("week_nights")
)
df_clean.show(3)


+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+----------+------------+
|      ID|n_adults|n_children|weekend_nights|week_nights|   meal_plan|car_parking_space|  room_type|lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|      status| full_date|total_nights|
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+----------+------------+
|INN00001|       2|         0|             1|          2| Meal Plan 1|                0|Room_Type 1|      224|2017|   10|   2|       Offline|             0|    

In [6]:
# Ваш код здесь
df_confirmed = df_clean.filter(col("status") != "Canceled")

result = (
    df_confirmed.groupBy("year", "month")
    .agg(
        round(avg("total_nights"), 2).alias("avg_nights"),
    )
    .orderBy("year", "month")
)

result.show()


+----+-----+----------+
|year|month|avg_nights|
+----+-----+----------+
|2017|    7|      3.02|
|2017|    8|      2.72|
|2017|    9|      2.66|
|2017|   10|       2.7|
|2017|   11|      2.72|
|2017|   12|      3.04|
|2018|    1|      2.74|
|2018|    2|      2.69|
|2018|    3|      3.04|
|2018|    4|      2.92|
|2018|    5|      2.81|
|2018|    6|       2.6|
|2018|    7|      3.19|
|2018|    8|      3.15|
|2018|    9|      2.79|
|2018|   10|      2.89|
|2018|   11|      2.98|
|2018|   12|      3.25|
+----+-----+----------+



#### Задача 2 (1 балл)

Определить ТОП-3 месяца по проценту отмененных броней за 2018 год.

In [7]:
# Ваш код здесь
cancel_rate_df = (
    df_clean.filter(col("year") == 2018)
    .groupBy("month")
    .agg(
        sum(when(col("status") == "Canceled", 1).otherwise(0)).alias("canceled_count"),
        count("*").alias("total_count"),
    )
    .withColumn(
        "cancel_rate", round(col("canceled_count") / col("total_count") * 100, 2)
    )
    .orderBy(desc("cancel_rate"))
)

cancel_rate_df.select("month", "cancel_rate").show(3)


+-----+-----------+
|month|cancel_rate|
+-----+-----------+
|    8|      46.55|
|   10|      46.36|
|    9|      45.78|
+-----+-----------+
only showing top 3 rows



#### Задача 3 (1 балл)

Вычислить среднее время на каждый месяц между бронированием и заездом в отель для подтвержденных броней.

In [8]:
# Ваш код здесь
result = (
    df_confirmed.groupBy("year", "month")
    .agg(round(avg("lead_time"), 2).alias("avg_lead_time"))
    .orderBy("year", "month")
)

result.show()


+----+-----+-------------+
|year|month|avg_lead_time|
+----+-----+-------------+
|2017|    7|       130.73|
|2017|    8|        35.08|
|2017|    9|        51.72|
|2017|   10|        55.89|
|2017|   11|        33.28|
|2017|   12|        46.75|
|2018|    1|        34.87|
|2018|    2|        30.53|
|2018|    3|        43.19|
|2018|    4|        62.49|
|2018|    5|        60.99|
|2018|    6|        70.64|
|2018|    7|        86.88|
|2018|    8|        83.09|
|2018|    9|        63.32|
|2018|   10|        73.24|
|2018|   11|        44.25|
|2018|   12|        69.75|
+----+-----+-------------+



#### Задача 4 (2 балла)

Вычислить общую среднюю выручку на каждый месяц в каждом году, сгруппировав по всем типам бронирования для подтвержденных броней, и вывести это в виде сводной таблицы (PIVOT).

In [9]:
# Ваш код здесь
pivot_df = (
    df_confirmed.withColumn("total_price", col("total_nights") * col("avg_room_price")) 
    # считаю с учётом комментария из ответов на вопросы чата
    .groupBy("year", "month")
    .pivot("market_segment")
    .agg(round(sum("total_price"), 2))
    .orderBy("year", "month")
)
pivot_df.show()


+----+-----+--------+-------------+---------+---------+---------+
|year|month|Aviation|Complementary|Corporate|  Offline|   Online|
+----+-----+--------+-------------+---------+---------+---------+
|2017|    7|    NULL|       111.99|    455.0| 17628.91|  9878.98|
|2017|    8|    NULL|         12.0|   8446.5| 67599.17|128178.86|
|2017|    9|    NULL|        456.0|  21162.0|164236.86|218542.22|
|2017|   10|    NULL|         24.0|  18387.0|182385.17|208683.27|
|2017|   11|    NULL|        237.0|  8546.69| 54349.73| 59408.66|
|2017|   12|    NULL|          8.0| 13122.89| 91898.63|108489.75|
|2018|    1|    NULL|         68.0| 11303.01| 66101.66|128907.72|
|2018|    2|   352.0|         25.0| 22205.99| 85125.61|172359.58|
|2018|    3|   355.0|        878.0| 22213.62| 99659.13|316493.55|
|2018|    4|  6758.0|          0.0|   6396.8| 93629.08|401066.14|
|2018|    5|  2625.0|          0.0|  35113.4|118328.97|363962.27|
|2018|    6|  1235.0|          0.0| 12591.05|203092.63|333353.17|
|2018|    

#### Задача 5 (2 балла)

Выявить ТОП-5 постоянных гостей, которые принесли наибольшую выручку за все время, и показать их долю в общей выручке от постоянных гостей. Использовать уникальный идентификатор брони как уникальный идентификатор гостя, предположив, что 1 бронь = 1 гость.

In [10]:
# Ваш код здесь
guest_revenue = (
    df_clean.filter(col("repeated_guest") == 1)
    .groupBy("ID")
    .agg(
        round(sum(col("avg_room_price") * col("total_nights")), 2).alias(
            "total_price_guest"
        )
    )
)

total_sum = guest_revenue.agg(sum("total_price_guest")).collect()[0][0]

final_top_guests = (
    guest_revenue.withColumn(
        "part", round(col("total_price_guest") / lit(total_sum), 5)
    )
    .orderBy(desc("total_price_guest"))
    .limit(5)
)

final_top_guests.show()


+--------+-----------------+-------+
|      ID|total_price_guest|   part|
+--------+-----------------+-------+
|INN19235|           1754.4|0.01512|
|INN05222|            690.0|0.00595|
|INN14189|            665.0|0.00573|
|INN09923|            660.0|0.00569|
|INN25479|            650.0| 0.0056|
+--------+-----------------+-------+



#### Задача 6 (2 балла)

Вывести общее количество гостей на каждый день в отеле, отсортировав по убыванию дат, включая дни, когда отель пустует. Также рассчитать процент загрузки для каждого дня, если известно, что общая вместимость отеля 400 человек.

In [11]:
calendar_df = calendar_df.withColumn("real_date", to_date(col("calendar_dt"), "yyyy-dd-MM"))

df_occupancy = (
    df_clean.filter(col("total_nights") > 0)
    .filter(col("status") != "Canceled")
    .withColumn(
        "guests_in_room",
        coalesce(col("n_adults"), lit(0)) + coalesce(col("n_children"), lit(0)),
    )
    .withColumn("end_date", date_add(col("full_date"), col("total_nights") - 1))
)

df_exploded = df_occupancy.select(
    col("guests_in_room"),
    explode(sequence(col("full_date"), col("end_date"), expr("interval 1 day"))).alias(
        "occupied_date"
    ),
)

daily_stat = (
    calendar_df.join(
        df_exploded, calendar_df.real_date == df_exploded.occupied_date, "left"
    )
    .groupBy("real_date")
    .agg(sum("guests_in_room").alias("guests_count"))
    .orderBy(desc("real_date"))
)

final_stat = daily_stat.fillna(0, subset=["guests_count"]).withColumn(
    "occupancy_rate", round((col("guests_count") / 400) * 100, 2)
)

final_stat.show(20)


+----------+------------+--------------+
| real_date|guests_count|occupancy_rate|
+----------+------------+--------------+
|2018-12-31|         562|         140.5|
|2018-12-30|         572|         143.0|
|2018-12-29|         542|         135.5|
|2018-12-28|         507|        126.75|
|2018-12-27|         552|         138.0|
|2018-12-26|         422|         105.5|
|2018-12-25|         397|         99.25|
|2018-12-24|         373|         93.25|
|2018-12-23|         341|         85.25|
|2018-12-22|         282|          70.5|
|2018-12-21|         247|         61.75|
|2018-12-20|         240|          60.0|
|2018-12-19|         228|          57.0|
|2018-12-18|         258|          64.5|
|2018-12-17|         274|          68.5|
|2018-12-16|         254|          63.5|
|2018-12-15|         170|          42.5|
|2018-12-14|         155|         38.75|
|2018-12-13|         153|         38.25|
|2018-12-12|         167|         41.75|
+----------+------------+--------------+
only showing top