# __Домашнее задание 5. Анализ данных на Spark SQL__

In [1]:
!pip install pyspark
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [2]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:90% !important; }</style>"))

In [3]:
import findspark
import pyspark
import pyspark.sql.functions as F
from pyspark.context import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat, lit, to_date, when
from pyspark.sql.types import *


In [4]:
findspark.init()
spark = (
    SparkSession
    .builder
    .appName('Test_01')
    .config('spark.ui.port', '9311')
    .config('spark.executor.memoryOverhead', '1G')
    .config('spark.shuffle.service.enabled', 'true')
    .config('spark.dynamicAllocation.enabled', 'true')
    .config('spark.driver.extraClassPath', '/opt/spark/jars/sqljdbc42.jar')\
    .config('spark.executor.extraClassPath', '/opt/spark/jars/sqljdbc42.jar')\
    .getOrCreate()
)

1. __Создание таблицы с логами__

In [5]:
schema = StructType([
    StructField("ID", StringType(), True),
    StructField("n_adults", IntegerType(), True),
    StructField("n_children", IntegerType(), True),
    StructField("weekend_nights", IntegerType(), True),
    StructField("week_nights", IntegerType(), True),
    StructField("meal_plan", StringType(), True),
    StructField("car_parking_space", IntegerType(), True),
    StructField("room_type", StringType(), True),
    StructField("lead_time", IntegerType(), True),
    StructField("year", IntegerType(), True),
    StructField("month", IntegerType(), True),
    StructField("date", IntegerType(), True),
    StructField("market_segment", StringType(), True),
    StructField("repeated_guest", IntegerType(), True),
    StructField("previous_cancellations", IntegerType(), True),
    StructField("previous_bookings_not_canceled", IntegerType(), True),
    StructField("avg_room_price", DoubleType(), True),
    StructField("special_requests", IntegerType(), True),
    StructField("status", StringType(), True)
])

In [6]:
logs_hotel_df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv("/content/Hotel.csv")

In [7]:
logs_hotel_df = logs_hotel_df.filter(
    ~((col("year") == 2018) & (col("month") == 2) & (col("date") == 29))
)

In [8]:
logs_hotel_df = logs_hotel_df.withColumn(
    "full_date",
    to_date(
        concat(
            col("year"), lit("-"),
            col("month"), lit("-"),
            col("date")
        ),
        "yyyy-M-d"
    )
)

In [9]:
logs_hotel_df.show(10,0)

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+----------+
|ID      |n_adults|n_children|weekend_nights|week_nights|meal_plan   |car_parking_space|room_type  |lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|status      |full_date |
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+----------+
|INN00001|2       |0         |1             |2          |Meal Plan 1 |0                |Room_Type 1|224      |2017|10   |2   |Offline       |0             |0                     |0                   

In [10]:
logs_hotel_df .printSchema()

root
 |-- ID: string (nullable = true)
 |-- n_adults: integer (nullable = true)
 |-- n_children: integer (nullable = true)
 |-- weekend_nights: integer (nullable = true)
 |-- week_nights: integer (nullable = true)
 |-- meal_plan: string (nullable = true)
 |-- car_parking_space: integer (nullable = true)
 |-- room_type: string (nullable = true)
 |-- lead_time: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- date: integer (nullable = true)
 |-- market_segment: string (nullable = true)
 |-- repeated_guest: integer (nullable = true)
 |-- previous_cancellations: integer (nullable = true)
 |-- previous_bookings_not_canceled: integer (nullable = true)
 |-- avg_room_price: double (nullable = true)
 |-- special_requests: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- full_date: date (nullable = true)



In [11]:
logs_hotel_df.write.mode("overwrite").saveAsTable("logs_hotel")

2. __Создание таблицы с календарем__

In [12]:
calendar_df = spark.sql("""
    SELECT
        explode(
            sequence(
                to_date('2017-01-01'),
                to_date('2018-12-31'),
                interval 1 day
            )
        ) as calendar_dt
""")

In [13]:
calendar_df.write.mode("overwrite").saveAsTable("calendar")
calendar_df.show(10)

+-----------+
|calendar_dt|
+-----------+
| 2017-01-01|
| 2017-01-02|
| 2017-01-03|
| 2017-01-04|
| 2017-01-05|
| 2017-01-06|
| 2017-01-07|
| 2017-01-08|
| 2017-01-09|
| 2017-01-10|
+-----------+
only showing top 10 rows


In [14]:
calendar_df.write.mode("overwrite").saveAsTable("calendar")

3.1. __Среднее количество ночей по месяцам и годам__

In [26]:
query1 = spark.sql("""
    SELECT
        year,
        month,
        ROUND(AVG(weekend_nights + week_nights), 2) as avg_nights,
        COUNT(*) as confirmed_bookings
    FROM logs_hotel
    WHERE status = 'Not_Canceled'
    GROUP BY year, month
    ORDER BY year, month
""")

query1.show()

+----+-----+----------+------------------+
|year|month|avg_nights|confirmed_bookings|
+----+-----+----------+------------------+
|2017|    7|      3.02|               120|
|2017|    8|      2.72|               829|
|2017|    9|      2.66|              1467|
|2017|   10|       2.7|              1611|
|2017|   11|      2.72|               620|
|2017|   12|      3.04|               906|
|2018|    1|      2.74|               990|
|2018|    2|      2.68|              1244|
|2018|    3|      3.04|              1658|
|2018|    4|      2.92|              1741|
|2018|    5|      2.81|              1650|
|2018|    6|       2.6|              1912|
|2018|    7|      3.19|              1486|
|2018|    8|      3.15|              1496|
|2018|    9|      2.79|              1606|
|2018|   10|      2.89|              1826|
|2018|   11|      2.98|              1485|
|2018|   12|      3.25|              1713|
+----+-----+----------+------------------+



3.2. __TOP-3 месяца по проценту отмен за 2018 год__

In [37]:
query2 = spark.sql("""
    WITH monthly_stats AS (
        SELECT
            month,
            COUNT(*) as total_bookings,
            SUM(CASE WHEN status = 'Canceled' THEN 1 ELSE 0 END) as canceled_bookings
        FROM logs_hotel
        WHERE year = 2018
        GROUP BY month
    )
    SELECT
        month,
        total_bookings,
        canceled_bookings,
        ROUND((canceled_bookings * 100.0 / total_bookings), 2) as cancellation_rate_percent
    FROM monthly_stats
    ORDER BY cancellation_rate_percent DESC
    LIMIT 3
""")

query2.show()

+-----+--------------+-----------------+-------------------------+
|month|total_bookings|canceled_bookings|cancellation_rate_percent|
+-----+--------------+-----------------+-------------------------+
|    8|          2799|             1303|                    46.55|
|   10|          3404|             1578|                    46.36|
|    9|          2962|             1356|                    45.78|
+-----+--------------+-----------------+-------------------------+



3.3. __Среднее время между бронированием и заездом__

In [38]:
query3 = spark.sql("""
    SELECT
        year,
        month,
        ROUND(AVG(lead_time), 2) as avg_lead_time_days
    FROM logs_hotel
    WHERE status = 'Not_Canceled'
    GROUP BY year, month
    ORDER BY year, month
""")

query3.show()

+----+-----+------------------+
|year|month|avg_lead_time_days|
+----+-----+------------------+
|2017|    7|            130.73|
|2017|    8|             35.08|
|2017|    9|             51.72|
|2017|   10|             55.89|
|2017|   11|             33.28|
|2017|   12|             46.75|
|2018|    1|             34.87|
|2018|    2|             30.27|
|2018|    3|             43.19|
|2018|    4|             62.49|
|2018|    5|             60.99|
|2018|    6|             70.64|
|2018|    7|             86.88|
|2018|    8|             83.09|
|2018|    9|             63.32|
|2018|   10|             73.24|
|2018|   11|             44.25|
|2018|   12|             69.75|
+----+-----+------------------+



3.4. __Сводная таблица по выручке__

In [41]:
from pyspark.sql.functions import round

query4 = spark.sql("""
    SELECT
        market_segment,
        year,
        month,
        ROUND(AVG(avg_room_price * (weekend_nights + week_nights)), 2) as avg_revenue
    FROM logs_hotel
    WHERE status = 'Not_Canceled'
    GROUP BY market_segment, year, month
""")

pivot_query = query4.groupBy("market_segment") \
    .pivot("year") \
    .agg({"avg_revenue": "avg"}) \
    .orderBy("market_segment")

for column in pivot_query.columns:
    if column != "market_segment":
        pivot_query = pivot_query.withColumn(column, round(col(column), 2))

pivot_query.show()

+--------------+------+------+
|market_segment|  2017|  2018|
+--------------+------+------+
|      Aviation|  NULL|246.12|
| Complementary|  9.29|  6.79|
|     Corporate|145.39|153.83|
|       Offline|229.43|263.87|
|        Online|289.04|330.73|
+--------------+------+------+



3.5. __TOP-5 постоянных гостей по выручке__

In [44]:
query5 = spark.sql("""
    WITH guest_revenue AS (
        SELECT
            ID as guest_id,
            SUM(avg_room_price * (weekend_nights + week_nights)) as total_revenue
        FROM logs_hotel
        WHERE repeated_guest = 1
        GROUP BY ID
    ),
    total_revenue_repeated AS (
        SELECT SUM(total_revenue) as total_revenue_all
        FROM guest_revenue
    )
    SELECT
        gr.guest_id,
        ROUND(gr.total_revenue,2) as total_revenue,
        ROUND((gr.total_revenue * 100.0 / tr.total_revenue_all), 2) as revenue_percentage
    FROM guest_revenue gr
    CROSS JOIN total_revenue_repeated tr
    ORDER BY gr.total_revenue DESC
    LIMIT 5
""")

query5.show()

+--------+-------------+------------------+
|guest_id|total_revenue|revenue_percentage|
+--------+-------------+------------------+
|INN19235|       1754.4|              1.53|
|INN05222|        690.0|               0.6|
|INN14189|        665.0|              0.58|
|INN09923|        660.0|              0.57|
|INN25479|        650.0|              0.57|
+--------+-------------+------------------+



3.6. __Количество гостей и загрузка отеля по дням__

In [31]:
query6 = spark.sql("""
    WITH daily_guests AS (
        SELECT
            full_date as stay_date,
            SUM(n_adults + n_children) as guests_count
        FROM logs_hotel
        WHERE status = 'Not_Canceled'
        GROUP BY full_date
    ),
    all_days AS (
        SELECT
            c.calendar_dt,
            COALESCE(dg.guests_count, 0) as guests_count
        FROM calendar c
        LEFT JOIN daily_guests dg ON c.calendar_dt = dg.stay_date
        WHERE c.calendar_dt BETWEEN '2017-01-01' AND '2018-12-31'
    )
    SELECT
        calendar_dt,
        guests_count,
        ROUND((guests_count * 100.0 / 400), 2) as occupancy_percentage
    FROM all_days
    ORDER BY calendar_dt DESC
""")

query6.show()

+-----------+------------+--------------------+
|calendar_dt|guests_count|occupancy_percentage|
+-----------+------------+--------------------+
| 2018-12-31|          67|               16.75|
| 2018-12-30|         166|               41.50|
| 2018-12-29|         162|               40.50|
| 2018-12-28|         134|               33.50|
| 2018-12-27|         263|               65.75|
| 2018-12-26|         117|               29.25|
| 2018-12-25|          84|               21.00|
| 2018-12-24|          98|               24.50|
| 2018-12-23|         113|               28.25|
| 2018-12-22|          89|               22.25|
| 2018-12-21|          91|               22.75|
| 2018-12-20|          64|               16.00|
| 2018-12-19|          68|               17.00|
| 2018-12-18|          83|               20.75|
| 2018-12-17|          82|               20.50|
| 2018-12-16|         124|               31.00|
| 2018-12-15|          53|               13.25|
| 2018-12-14|          44|              

In [45]:
spark.stop()