### Инициализация SparkSession

In [51]:
import findspark
findspark.init()              

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("HW-3-Hotel")
    .getOrCreate()
)

spark.conf.set("spark.sql.session.timeZone", "UTC")  # фиксируем таймзону
print("Spark version:", spark.version)


Spark version: 3.5.5


### Данные

In [52]:
from pyspark.sql.functions import (
    col, lpad, concat_ws, to_date, when,
    sequence, explode, lit, avg, sum as _sum,
    count, round as _round
)
from pyspark.sql import functions as F


logs_hotel = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv("data/Hotel.csv")
    .withColumn("total_nights", col("weekend_nights") + col("week_nights"))
    .withColumn("total_guests", col("n_adults") + col("n_children"))
    .withColumn(
        "stay_date",
        to_date(
            concat_ws(
                "-",
                col("year").cast("string"),
                lpad(col("month").cast("string"), 2, "0"),
                lpad(col("date").cast("string"), 2, "0"),
            ),
            "yyyy-MM-dd",
        )
    )
    .cache()
)

logs_hotel.printSchema()
logs_hotel.show(5)


root
 |-- ID: string (nullable = true)
 |-- n_adults: integer (nullable = true)
 |-- n_children: integer (nullable = true)
 |-- weekend_nights: integer (nullable = true)
 |-- week_nights: integer (nullable = true)
 |-- meal_plan: string (nullable = true)
 |-- car_parking_space: integer (nullable = true)
 |-- room_type: string (nullable = true)
 |-- lead_time: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- date: integer (nullable = true)
 |-- market_segment: string (nullable = true)
 |-- repeated_guest: integer (nullable = true)
 |-- previous_cancellations: integer (nullable = true)
 |-- previous_bookings_not_canceled: integer (nullable = true)
 |-- avg_room_price: double (nullable = true)
 |-- special_requests: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- total_nights: integer (nullable = true)
 |-- total_guests: integer (nullable = true)
 |-- stay_date: date (nullable = true)

+--------+--------+-------

### Календарь

In [54]:
from datetime import date
from pyspark.sql.functions import date_add, lit

start_date = date(2017, 1, 1)
num_days   = (date(2018, 12, 31) - start_date).days

calendar_dt = (
    spark.range(num_days + 1)
         .select(
             date_add(lit(str(start_date)).cast("date"),
                      col("id").cast("int")
             ).alias("calendar_dt")
         )
)

calendar_dt.show(3)

+-----------+
|calendar_dt|
+-----------+
| 2017-01-01|
| 2017-01-02|
| 2017-01-03|
+-----------+
only showing top 3 rows



In [30]:
# (a): среднее число ночей
avg_nights_df = logs_hotel.select(avg("total_nights").alias("avg_nights"))
avg_nights_df.show()


+------------------+
|        avg_nights|
+------------------+
|3.0150241212956583|
+------------------+



In [None]:
# b) Кол-во отмен каждый месяц в процентах % 
cancel_rate_df = (
    logs_hotel
        .groupBy("year", "month")
        .agg(
            F.round(F.sum(                             
                    F.when(F.lower("status") == "canceled", 1).otherwise(0)
                )
                / F.count("*") * 100,              
                2
            ).alias("cancel_pct")
        )
        .orderBy("year", "month")
)
cancel_rate_df.show()


+----+-----+----------+
|year|month|cancel_pct|
+----+-----+----------+
|2017|    7|     66.94|
|2017|    8|     18.24|
|2017|    9|     11.04|
|2017|   10|     15.79|
|2017|   11|      4.17|
|2017|   12|      2.37|
|2018|    1|      2.37|
|2018|    2|     25.23|
|2018|    3|     29.69|
|2018|    4|     36.37|
|2018|    5|     36.49|
|2018|    6|     40.31|
|2018|    7|     41.89|
|2018|    8|     46.55|
|2018|    9|     45.78|
|2018|   10|     46.36|
|2018|   11|     36.35|
|2018|   12|     18.16|
+----+-----+----------+



In [None]:
# c) Выручка 
revenue_df = (
    logs_hotel
        .filter(col("status") != "canceled")        # берём всё, что НЕ "canceled"
        .withColumn("booking_revenue",
                    col("avg_room_price") * col("total_nights"))
        .groupBy("year", "month", "market_segment")
        .agg(_sum("booking_revenue").alias("revenue"))
        .orderBy("year", "month", "market_segment")
)


revenue_df.show()

+----+-----+--------------+------------------+
|year|month|market_segment|           revenue|
+----+-----+--------------+------------------+
|2017|    7| Complementary|            111.99|
|2017|    7|     Corporate|             455.0|
|2017|    7|       Offline|          38860.41|
|2017|    7|        Online| 56796.39999999999|
|2017|    8| Complementary|              12.0|
|2017|    8|     Corporate|14237.099999999999|
|2017|    8|       Offline|          83050.67|
|2017|    8|        Online|168630.25000000006|
|2017|    9| Complementary|             456.0|
|2017|    9|     Corporate|           23242.0|
|2017|    9|       Offline|191068.39000000028|
|2017|    9|        Online|254403.98999999985|
|2017|   10| Complementary|              24.0|
|2017|   10|     Corporate|           20756.0|
|2017|   10|       Offline|221994.35000000012|
|2017|   10|        Online|         234931.62|
|2017|   11| Complementary|             237.0|
|2017|   11|     Corporate|           8868.69|
|2017|   11| 

In [None]:
# d) Среднее время между бр-м и заездом
lead_time_df = (
    logs_hotel
    .groupBy("year", "month")
    .agg(avg("lead_time").alias("avg_lead_time"))
    .orderBy("year", "month")
)
lead_time_df.show()


+----+-----+------------------+
|year|month|     avg_lead_time|
+----+-----+------------------+
|2017|    7|146.97796143250687|
|2017|    8| 42.25049309664694|
|2017|    9| 56.69254093389933|
|2017|   10| 66.25196027182436|
|2017|   11|34.425038639876355|
|2017|   12| 48.37823275862069|
|2018|    1| 34.89842209072978|
|2018|    2| 31.31338028169014|
|2018|    3| 47.13952502120441|
|2018|    4| 74.25255847953217|
|2018|    5| 84.63933795227098|
|2018|    6|100.41554792382142|
|2018|    7|111.91317950723504|
|2018|    8|115.65094676670239|
|2018|    9|119.36731937879811|
|2018|   10|124.78936545240893|
|2018|   11| 82.41191598799828|
|2018|   12| 87.50979455327281|
+----+-----+------------------+



In [58]:
# e) Кол-во гостей в день
per_day_guests_df = (
    logs_hotel
    .filter(col("stay_date").isNotNull())
    .groupBy("stay_date")
    .agg(_sum("total_guests").alias("guests"))
    .orderBy(col("guests").desc())
)
per_day_guests_df.show(50)


+----------+------+
| stay_date|guests|
+----------+------+
|2017-10-16|   441|
|2018-05-13|   412|
|2018-10-13|   393|
|2018-03-25|   385|
|2018-09-15|   369|
|2017-09-18|   366|
|2017-08-14|   365|
|2018-06-17|   362|
|2018-04-06|   358|
|2018-11-04|   343|
|2018-09-26|   342|
|2018-06-26|   326|
|2018-06-15|   326|
|2018-12-27|   325|
|2018-06-02|   318|
|2018-06-08|   309|
|2018-10-16|   305|
|2018-10-28|   303|
|2018-06-24|   299|
|2018-12-08|   298|
|2018-10-14|   288|
|2018-08-08|   287|
|2018-10-07|   286|
|2018-10-29|   286|
|2018-09-30|   285|
|2018-12-02|   284|
|2018-05-20|   284|
|2018-11-03|   283|
|2018-10-12|   282|
|2018-07-23|   277|
|2018-08-19|   276|
|2018-06-20|   275|
|2018-10-03|   271|
|2018-08-13|   268|
|2018-10-30|   268|
|2018-07-21|   268|
|2018-05-21|   267|
|2018-08-01|   266|
|2018-11-25|   266|
|2018-04-22|   264|
|2018-11-11|   262|
|2018-11-18|   260|
|2018-09-08|   257|
|2018-08-22|   255|
|2018-09-19|   252|
|2018-03-20|   252|
|2018-09-16|   250|


In [59]:
spark.stop()
