In [132]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port','0'). \
config("spark.sql.warehouse.dir", f"/user/itv008012/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [133]:
spark

In [134]:
from pyspark.sql.functions import *

In [135]:
hotel_schema = 'booking_id integer, guest_name string, checkin_date date, checkout_date date, type_of_room string, total_rent float'

In [136]:
hotels_df = spark.read \
.format("csv") \
.option("header", "false") \
.schema(hotel_schema) \
.load("/public/trendytech/datasets/hotel_data.csv")

In [137]:
hotels_df

booking_id,guest_name,checkin_date,checkout_date,type_of_room,total_rent
1,John Doe,2023-05-01,2023-05-05,Standard,400.0
2,Jane Smith,2023-05-02,2023-05-06,Deluxe,600.0
3,Mark Johnson,2023-05-03,2023-05-08,Standard,450.0
4,Sarah Wilson,2023-05-04,2023-05-07,Executive,750.0
5,Emily Brown,2023-05-06,2023-05-09,Deluxe,550.0
6,Michael Davis,2023-05-07,2023-05-10,Standard,400.0
7,Samantha Thompson,2023-05-08,2023-05-12,Deluxe,600.0
8,William Lee,2023-05-10,2023-05-13,Standard,450.0
9,Amanda Harris,2023-05-11,2023-05-16,Executive,750.0
10,David Rodriguez,2023-05-12,2023-05-15,Deluxe,550.0


In [138]:
hotels_df.printSchema()

root
 |-- booking_id: integer (nullable = true)
 |-- guest_name: string (nullable = true)
 |-- checkin_date: date (nullable = true)
 |-- checkout_date: date (nullable = true)
 |-- type_of_room: string (nullable = true)
 |-- total_rent: float (nullable = true)



In [139]:
hotel_new_df = hotels_df.select(count("*").alias("row_count"),countDistinct("guest_name").alias("Distinct_customers"),avg("total_rent").alias("Average_Rent"))

In [140]:
hotel_new_df

row_count,Distinct_customers,Average_Rent
107,23,546.2616822429907


In [141]:
#columnar expression

hotel1_df = hotels_df.selectExpr("count(*) as row_count","count(Distinct(guest_name)) as Distinct_guests","avg(total_rent) as Average_Rent")

In [142]:
hotel1_df

row_count,Distinct_guests,Average_Rent
107,23,546.2616822429907


In [143]:
hotels_df.createOrReplaceTempView("Hotels")

In [144]:
spark.sql(""" select type_of_room, ROUND(AVG(total_rent),2) as AVG_Price, MAX(total_rent) As MAximum_Price from hotels group by type_of_room """).show()

+------------+---------+-------------+
|type_of_room|AVG_Price|MAximum_Price|
+------------+---------+-------------+
|   Executive|    750.0|        750.0|
|      Deluxe|   575.58|        600.0|
|    Standard|    425.0|        450.0|
+------------+---------+-------------+



In [145]:
spark.sql(""" Select booking_id, Count(*) as reservation_counts, AVG(DATEDIFF(checkout_date,checkin_date)) As Average_Stay from Hotels group by booking_id order by booking_id""").show()

+----------+------------------+------------+
|booking_id|reservation_counts|Average_Stay|
+----------+------------------+------------+
|         1|                 1|         4.0|
|         2|                 1|         4.0|
|         3|                 1|         5.0|
|         4|                 1|         3.0|
|         5|                 1|         3.0|
|         6|                 1|         3.0|
|         7|                 1|         4.0|
|         8|                 1|         3.0|
|         9|                 1|         5.0|
|        10|                 1|         3.0|
|        11|                 1|         4.0|
|        12|                 1|         5.0|
|        13|                 1|         5.0|
|        14|                 1|         6.0|
|        15|                 1|         5.0|
|        16|                 1|         5.0|
|        17|                 1|         6.0|
|        18|                 1|         5.0|
|        19|                 1|         6.0|
|        2

In [146]:
spark.sql(""" Select booking_id, guest_name, date_format(checkin_date, 'MM') as Booking_month, type_of_room, total_Rent from Hotels""").show()

+----------+-----------------+-------------+------------+----------+
|booking_id|       guest_name|Booking_month|type_of_room|total_Rent|
+----------+-----------------+-------------+------------+----------+
|         1|         John Doe|           05|    Standard|     400.0|
|         2|       Jane Smith|           05|      Deluxe|     600.0|
|         3|     Mark Johnson|           05|    Standard|     450.0|
|         4|     Sarah Wilson|           05|   Executive|     750.0|
|         5|      Emily Brown|           05|      Deluxe|     550.0|
|         6|    Michael Davis|           05|    Standard|     400.0|
|         7|Samantha Thompson|           05|      Deluxe|     600.0|
|         8|      William Lee|           05|    Standard|     450.0|
|         9|    Amanda Harris|           05|   Executive|     750.0|
|        10|  David Rodriguez|           05|      Deluxe|     550.0|
|        11|     Linda Wilson|           05|    Standard|     400.0|
|        12|   Robert Johnson|    

In [176]:
hotelss_df = spark.sql(""" Select type_of_room as room, date_format(checkin_date, 'MM') as Booking_Month, count(Booking_id) as total_bookings, sum(total_rent) as Total_Price from Hotels Group by room,Booking_Month Order by room, Booking_Month""")

In [177]:
hotelss_df.show()

+---------+-------------+--------------+-----------+
|     room|Booking_Month|total_bookings|Total_Price|
+---------+-------------+--------------+-----------+
|   Deluxe|           05|             9|     5200.0|
|   Deluxe|           06|            10|     5750.0|
|   Deluxe|           07|            16|     9200.0|
|   Deluxe|           08|             8|     4600.0|
|Executive|           05|             5|     3750.0|
|Executive|           06|             5|     3750.0|
|Executive|           07|             6|     4500.0|
|Executive|           08|             4|     3000.0|
| Standard|           05|            10|     4250.0|
| Standard|           06|            10|     4250.0|
| Standard|           07|            16|     6800.0|
| Standard|           08|             8|     3400.0|
+---------+-------------+--------------+-----------+



In [149]:
# 3 ways:
#programatic, columnar, sql

In [150]:
from pyspark.sql import *

In [151]:
My_window = Window.partitionBy("room") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)

In [152]:
neww_dataframe = hotelss_df.withColumn("running_total_booking", sum("count(Booking_id)").over(My_window)).withColumn("running_total_price", sum("Total_Price").over(My_window))

In [153]:
neww_dataframe.show()

+---------+-------------+-----------------+-----------+---------------------+-------------------+
|     room|Booking_Month|count(Booking_id)|Total_Price|running_total_booking|running_total_price|
+---------+-------------+-----------------+-----------+---------------------+-------------------+
|Executive|           05|                5|     3750.0|                    5|             3750.0|
|Executive|           06|                5|     3750.0|                   10|             7500.0|
|Executive|           07|                6|     4500.0|                   16|            12000.0|
|Executive|           08|                4|     3000.0|                   20|            15000.0|
|   Deluxe|           05|                9|     5200.0|                    9|             5200.0|
|   Deluxe|           06|               10|     5750.0|                   19|            10950.0|
|   Deluxe|           07|               16|     9200.0|                   35|            20150.0|
|   Deluxe|         

In [154]:
from pyspark.sql.functions import *

In [155]:
my_win = Window.partitionBy("room") \
.orderBy(desc("Total_Price")) 

In [158]:
new4_df = neww_dataframe.withColumn("rank",rank().over(my_win)).withColumn("Dense_rank",dense_rank().over(my_win)).withColumn("Row_Number",row_number().over(my_win))

In [159]:
new4_df


room,Booking_Month,count(Booking_id),Total_Price,running_total_booking,running_total_price,rank,Dense_rank,Row_Number
Executive,7,6,4500.0,16,12000.0,1,1,1
Executive,5,5,3750.0,5,3750.0,2,2,2
Executive,6,5,3750.0,10,7500.0,2,2,3
Executive,8,4,3000.0,20,15000.0,4,3,4
Deluxe,7,16,9200.0,35,20150.0,1,1,1
Deluxe,6,10,5750.0,19,10950.0,2,2,2
Deluxe,5,9,5200.0,9,5200.0,3,3,3
Deluxe,8,8,4600.0,43,24750.0,4,4,4
Standard,7,16,6800.0,36,15300.0,1,1,1
Standard,5,10,4250.0,10,4250.0,2,2,2


In [162]:
new4_df.createOrReplaceTempView("new_hotels")

In [167]:
hotel5_df = spark.sql(""" Select room, Booking_Month, Sum(Total_Price) as Total_amount from new_hotels group By 1,2""")

In [168]:
hotel5_df.show()

+---------+-------------+------------+
|     room|Booking_Month|Total_amount|
+---------+-------------+------------+
|   Deluxe|           05|      5200.0|
|   Deluxe|           06|      5750.0|
|   Deluxe|           07|      9200.0|
|   Deluxe|           08|      4600.0|
|Executive|           05|      3750.0|
|Executive|           06|      3750.0|
|Executive|           07|      4500.0|
|Executive|           08|      3000.0|
| Standard|           05|      4250.0|
| Standard|           06|      4250.0|
| Standard|           07|      6800.0|
| Standard|           08|      3400.0|
+---------+-------------+------------+



In [169]:
hotel_window = Window.partitionBy("room") \
.orderBy("Booking_Month") 

In [170]:
hotel6 = hotel5_df.withColumn("previous_month", lag("Total_amount").over(hotel_window))

In [171]:
hotel6.show()

+---------+-------------+------------+--------------+
|     room|Booking_Month|Total_amount|previous_month|
+---------+-------------+------------+--------------+
|Executive|           05|      3750.0|          null|
|Executive|           06|      3750.0|        3750.0|
|Executive|           07|      4500.0|        3750.0|
|Executive|           08|      3000.0|        4500.0|
|   Deluxe|           05|      5200.0|          null|
|   Deluxe|           06|      5750.0|        5200.0|
|   Deluxe|           07|      9200.0|        5750.0|
|   Deluxe|           08|      4600.0|        9200.0|
| Standard|           05|      4250.0|          null|
| Standard|           06|      4250.0|        4250.0|
| Standard|           07|      6800.0|        4250.0|
| Standard|           08|      3400.0|        6800.0|
+---------+-------------+------------+--------------+



In [175]:
checkin_month = ["05","06","07","08"]
summary = hotelss_df.groupBy("room").pivot("Booking_month",checkin_month).agg(expr("sum(Total_Price) as Something"))
summary.show(truncate = False)

+---------+------+------+------+------+
|room     |05    |06    |07    |08    |
+---------+------+------+------+------+
|Executive|3750.0|3750.0|4500.0|3000.0|
|Deluxe   |5200.0|5750.0|9200.0|4600.0|
|Standard |4250.0|4250.0|6800.0|3400.0|
+---------+------+------+------+------+



In [178]:
checkin = ["05","06","07","08"]
summary1 = hotelss_df.groupBy("room").pivot("Booking_month",checkin_month).agg(expr("sum(total_bookings) as Something"))
summary1.show(truncate = False)

+---------+---+---+---+---+
|room     |05 |06 |07 |08 |
+---------+---+---+---+---+
|Executive|5  |5  |6  |4  |
|Deluxe   |9  |10 |16 |8  |
|Standard |10 |10 |16 |8  |
+---------+---+---+---+---+

