In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Assigment4').getOrCreate()

In [2]:
# Load the new hotels file
base_df = spark.read.csv('Hotels_data_Changed.csv',inferSchema=True,header=True)

In [3]:
base_df.show(2)

+-----------+-------------------+-------------------+----+--------------+--------------+-------------+---------------+--------------------+-----------+-------+------------+-------------+-------+
|Snapshot ID|      Snapshot Date|       Checkin Date|Days|Original Price|Discount Price|Discount Code|Available Rooms|          Hotel Name|Hotel Stars|WeekDay|DiscountDiff| DiscountPerc|DayDiff|
+-----------+-------------------+-------------------+----+--------------+--------------+-------------+---------------+--------------------+-----------+-------+------------+-------------+-------+
|          1|2015-07-17 00:00:00|2015-08-12 00:00:00|   5|          1178|          1040|            1|              6|Best Western Plus...|          3|    Wed|         138| 11.714770798|     26|
|          1|2015-07-17 00:00:00|2015-08-19 00:00:00|   5|          1113|           982|            1|              8|Best Western Plus...|          3|    Wed|         131|11.7699910153|     33|
+-----------+------------

In [4]:
from pyspark.sql.functions import col

# Get 150 Hotels that have the most rows in data
tophotels = base_df.groupBy("Hotel Name").count().sort(col("count").desc()).head(150)
tophotels

[Row(Hotel Name='Newark Liberty International Airport Marriott', count=5346),
 Row(Hotel Name='Hilton Garden Inn Times Square', count=4892),
 Row(Hotel Name='Residence Inn Newark Elizabeth Liberty International Airport', count=4314),
 Row(Hotel Name='Westin New York at Times Square', count=3792),
 Row(Hotel Name='Loews Regency New York Hotel', count=3617),
 Row(Hotel Name='Viceroy New York', count=3565),
 Row(Hotel Name='Four Seasons Hotel New York', count=3243),
 Row(Hotel Name='Langham Place New York Fifth Avenue', count=3203),
 Row(Hotel Name='The Carlyle A Rosewood Hotel', count=3078),
 Row(Hotel Name='DoubleTree by Hilton Metropolitan - New York City', count=2866),
 Row(Hotel Name='Magnuson Convention Center Hotel', count=2862),
 Row(Hotel Name='Hilton Garden Inn New York West 35th Street', count=2822),
 Row(Hotel Name='Hilton Garden Inn New York-Times Square Central', count=2772),
 Row(Hotel Name='Conrad New York', count=2677),
 Row(Hotel Name='Wyndham Garden Brooklyn Sunset Park

In [5]:
# convert the name + count list to dataframe and create a view for it
top_hotel_names_df = spark.sparkContext.parallelize(tophotels).toDF(['hotel_name','COUNT'])
top_hotel_names_df.createOrReplaceTempView("topHotelNames")

# Change a column name to handle in query
base_df.withColumnRenamed('Hotel Name','Hotel_Name').createOrReplaceTempView("base_df")

# Reduced the rows listed to only the ones that are among top 150 names
top_hotels_filtered_base_df = spark.sql("SELECT * FROM base_df WHERE Hotel_Name IN (SELECT hotel_name FROM topHotelNames)")
top_hotels_filtered_base_df.show(2)

+-----------+-------------------+-------------------+----+--------------+--------------+-------------+---------------+-------------+-----------+-------+------------+-------------+-------+
|Snapshot ID|      Snapshot Date|       Checkin Date|Days|Original Price|Discount Price|Discount Code|Available Rooms|   Hotel_Name|Hotel Stars|WeekDay|DiscountDiff| DiscountPerc|DayDiff|
+-----------+-------------------+-------------------+----+--------------+--------------+-------------+---------------+-------------+-----------+-------+------------+-------------+-------+
|        101|2015-08-16 00:00:00|2015-08-17 00:00:00|   5|          2055|          1989|            1|              1|Bentley Hotel|          4|    Mon|          66|3.21167883212|      1|
|        101|2015-08-16 00:00:00|2015-09-06 00:00:00|   5|          1409|          1348|            2|              3|Bentley Hotel|          4|    Sun|          61|4.32931156849|     21|
+-----------+-------------------+-------------------+----+--

In [6]:
# Get 40 Checkin dates that have the most rows in top 150 hotels data
top_checkin_dates = top_hotels_filtered_base_df.groupBy("Checkin Date").count().sort(col("count").desc()).head(40)

# convert the checkin date + count list to dataframe and create a view for it
top_checkin_dates_df = spark.sparkContext.parallelize(top_checkin_dates).toDF()
top_checkin_dates_df.show(40)

+-------------------+-----+
|       Checkin Date|count|
+-------------------+-----+
|2015-11-11 00:00:00| 2302|
|2015-10-14 00:00:00| 1887|
|2015-11-04 00:00:00| 1885|
|2015-08-19 00:00:00| 1883|
|2015-10-28 00:00:00| 1861|
|2015-10-21 00:00:00| 1817|
|2015-11-06 00:00:00| 1808|
|2015-08-12 00:00:00| 1765|
|2015-11-05 00:00:00| 1684|
|2015-10-22 00:00:00| 1662|
|2015-11-12 00:00:00| 1649|
|2015-09-10 00:00:00| 1623|
|2015-10-29 00:00:00| 1623|
|2015-09-09 00:00:00| 1616|
|2015-11-18 00:00:00| 1582|
|2015-08-26 00:00:00| 1559|
|2015-11-10 00:00:00| 1548|
|2015-11-13 00:00:00| 1547|
|2015-10-15 00:00:00| 1473|
|2015-11-21 00:00:00| 1469|
|2015-09-30 00:00:00| 1464|
|2015-10-30 00:00:00| 1412|
|2015-09-16 00:00:00| 1407|
|2015-09-17 00:00:00| 1402|
|2015-11-28 00:00:00| 1383|
|2015-10-01 00:00:00| 1373|
|2015-11-26 00:00:00| 1356|
|2015-09-11 00:00:00| 1332|
|2015-09-18 00:00:00| 1326|
|2015-10-16 00:00:00| 1309|
|2015-11-27 00:00:00| 1306|
|2015-10-02 00:00:00| 1280|
|2015-10-07 00:00:00

In [7]:
# Change a column name to handle in query
top_checkin_dates_df = top_checkin_dates_df.withColumnRenamed('Checkin Date','Checkin_Date')
top_checkin_dates_df.show(3)

+-------------------+-----+
|       Checkin_Date|count|
+-------------------+-----+
|2015-11-11 00:00:00| 2302|
|2015-10-14 00:00:00| 1887|
|2015-11-04 00:00:00| 1885|
+-------------------+-----+
only showing top 3 rows



In [8]:
# Create filtered data for top hotels with top dates

top_checkin_dates_df.createOrReplaceTempView("topCheckinDates")

# Change a column name to handle in query
top_hotels_filtered_base_df.withColumnRenamed('Checkin Date','Checkin_Date').withColumnRenamed('Discount Price','Discount_Price').withColumnRenamed('Discount Code','Discount_Code').createOrReplaceTempView("top_hotels_filtered_base")

# Reduced the rows listed to only the ones that are among top 40 checkin dates
hotel_rows_for_top_dates = spark.sql("SELECT Hotel_Name, Checkin_Date, Discount_Code, Discount_Price FROM top_hotels_filtered_base WHERE Checkin_Date IN (SELECT Checkin_Date FROM topCheckinDates)")

hotel_rows_for_top_dates.show()

+--------------------+-------------------+-------------+--------------+
|          Hotel_Name|       Checkin_Date|Discount_Code|Discount_Price|
+--------------------+-------------------+-------------+--------------+
|Westin New York a...|2015-11-26 00:00:00|            2|          1845|
|Westin New York a...|2015-11-26 00:00:00|            3|          1696|
|Westin New York a...|2015-11-26 00:00:00|            4|          1646|
|Westin New York a...|2015-11-26 00:00:00|            2|          1845|
|Westin New York a...|2015-11-26 00:00:00|            3|          1696|
|Westin New York a...|2015-11-26 00:00:00|            4|          1646|
|Westin New York a...|2015-11-26 00:00:00|            2|          1845|
|Westin New York a...|2015-11-26 00:00:00|            3|          1696|
|Westin New York a...|2015-11-26 00:00:00|            4|          1646|
|Westin New York a...|2015-11-26 00:00:00|            2|          1845|
|Westin New York a...|2015-11-26 00:00:00|            3|        

In [9]:
# Creates dataframe for combination of hotels with checkin dates
only_hotel_names_df = top_hotel_names_df.drop("COUNT")
only_checkin_dates_df = top_checkin_dates_df.drop("COUNT")
joint_df = only_hotel_names_df.crossJoin(only_checkin_dates_df)

In [10]:
joint_df.show(2)

+--------------------+-------------------+
|          hotel_name|       Checkin_Date|
+--------------------+-------------------+
|Newark Liberty In...|2015-11-11 00:00:00|
|Newark Liberty In...|2015-10-14 00:00:00|
+--------------------+-------------------+
only showing top 2 rows



In [11]:
# Creates dataframe for combination of hotels with checkin dates and discount codes
discount_codes_df = spark.range(1,5).withColumnRenamed("id","discount_code")
joint_with_price_codes_df = joint_df.crossJoin(discount_codes_df)

In [12]:
joint_with_price_codes_df.show(5)
joint_with_price_codes_df.count()

+--------------------+-------------------+-------------+
|          hotel_name|       Checkin_Date|discount_code|
+--------------------+-------------------+-------------+
|Newark Liberty In...|2015-11-11 00:00:00|            1|
|Newark Liberty In...|2015-11-11 00:00:00|            2|
|Newark Liberty In...|2015-11-11 00:00:00|            3|
|Newark Liberty In...|2015-11-11 00:00:00|            4|
|Newark Liberty In...|2015-10-14 00:00:00|            1|
+--------------------+-------------------+-------------+
only showing top 5 rows



24000

In [13]:
# Filter (top hotels filtered) base df to leave us with only listings
# of same combinations with minimum price
grouped_df = hotel_rows_for_top_dates.groupBy("Hotel_Name", "Checkin_Date", "Discount_Code").min("Discount_Price")

In [14]:
grouped_renamed_df = grouped_df.withColumnRenamed("Hotel_Name", "hotel_name").withColumnRenamed("Checkin_Date", "checkin_date").withColumnRenamed("Discount_Code", "discount_code").withColumnRenamed("min(Discount_Price)","discount_price")

In [15]:
grouped_renamed_df.show(20)

+--------------------+-------------------+-------------+--------------+
|          hotel_name|       checkin_date|discount_code|discount_price|
+--------------------+-------------------+-------------+--------------+
|Westin New York a...|2015-09-11 00:00:00|            3|          1759|
|Homewood Suites b...|2015-08-12 00:00:00|            1|          1195|
|New York Marriott...|2015-08-12 00:00:00|            2|          1275|
|    Viceroy New York|2015-11-13 00:00:00|            2|          1822|
|Omni Berkshire Place|2015-08-27 00:00:00|            2|          1156|
|     The Plaza Hotel|2015-08-27 00:00:00|            3|          2969|
|Hampton Inn Manha...|2015-11-03 00:00:00|            3|          1255|
|     The Plaza Hotel|2015-11-03 00:00:00|            4|          4088|
| The Kitano New York|2015-09-30 00:00:00|            2|          1893|
|Smyth A Thompson ...|2015-09-30 00:00:00|            3|          2525|
|Hilton Garden Inn...|2015-09-30 00:00:00|            1|        

In [16]:
final_columns_df = top_checkin_dates_df.select("checkin_date").crossJoin(discount_codes_df.select("discount_code"))

In [17]:
final_columns_df.show(2)
final_columns_df.count()

+-------------------+-------------+
|       checkin_date|discount_code|
+-------------------+-------------+
|2015-11-11 00:00:00|            1|
|2015-11-11 00:00:00|            2|
+-------------------+-------------+
only showing top 2 rows



160

In [18]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
w = Window().orderBy("checkin_date", "discount_code")
indexed_final_columns_df = final_columns_df.select(row_number().over(w).alias("ID"), col("*"))

indexed_final_columns_df.show(180)

+---+-------------------+-------------+
| ID|       checkin_date|discount_code|
+---+-------------------+-------------+
|  1|2015-08-12 00:00:00|            1|
|  2|2015-08-12 00:00:00|            2|
|  3|2015-08-12 00:00:00|            3|
|  4|2015-08-12 00:00:00|            4|
|  5|2015-08-13 00:00:00|            1|
|  6|2015-08-13 00:00:00|            2|
|  7|2015-08-13 00:00:00|            3|
|  8|2015-08-13 00:00:00|            4|
|  9|2015-08-19 00:00:00|            1|
| 10|2015-08-19 00:00:00|            2|
| 11|2015-08-19 00:00:00|            3|
| 12|2015-08-19 00:00:00|            4|
| 13|2015-08-26 00:00:00|            1|
| 14|2015-08-26 00:00:00|            2|
| 15|2015-08-26 00:00:00|            3|
| 16|2015-08-26 00:00:00|            4|
| 17|2015-08-27 00:00:00|            1|
| 18|2015-08-27 00:00:00|            2|
| 19|2015-08-27 00:00:00|            3|
| 20|2015-08-27 00:00:00|            4|
| 21|2015-08-28 00:00:00|            1|
| 22|2015-08-28 00:00:00|            2|


In [19]:
w = Window().orderBy("hotel_name")
indexed_hotels_df = only_hotel_names_df.select(row_number().over(w).alias("ID"), col("*"))

indexed_hotels_df.show(170)

+---+--------------------+
| ID|          hotel_name|
+---+--------------------+
|  1|        Aloft Harlem|
|  2|Andaz 5th Avenue ...|
|  3|Andaz Wall Street...|
|  4|Baccarat Hotel an...|
|  5|       Bentley Hotel|
|  6|Best Western Bays...|
|  7|Best Western Bowe...|
|  8|Best Western Plus...|
|  9|Best Western Plus...|
| 10|    Blakely New York|
| 11|Cassa Hotel 45th ...|
| 12|         Chelsea Inn|
| 13|Comfort Inn Times...|
| 14|     Conrad New York|
| 15|Courtyard New Yor...|
| 16|Courtyard Newark ...|
| 17|Courtyard by Marr...|
| 18|Courtyard by Marr...|
| 19|Courtyard by Marr...|
| 20|Courtyard by Marr...|
| 21|Courtyard by Marr...|
| 22|Crowne Plaza Time...|
| 23|Days Inn Bronx Ne...|
| 24|DoubleTree Suites...|
| 25|DoubleTree by Hil...|
| 26|DoubleTree by Hil...|
| 27|DoubleTree by Hil...|
| 28|DoubleTree by Hil...|
| 29|DoubleTree by Hil...|
| 30|Dumont NYC-an Aff...|
| 31|Embassy Suites Ne...|
| 32|Eventi Hotel a Ki...|
| 33|Fairfield Inn by ...|
| 34|Four Seasons Hote...|
|

In [None]:
def initMat(x,y):
        return -1
finalMat = [[initMat(x,y) for x in range(161)] for y in range(150)]

In [None]:
grouped_renamed_df.count()

In [None]:
hotel_dict = {}
for row in indexed_hotels_df.collect():
    hotel_dict[row.hotel_name] = row.ID

In [None]:
combinations_dict = {}
for row in indexed_final_columns_df.collect():
    combinations_dict[row.checkin_date, row.discount_code] = row.ID

In [None]:
for row in grouped_renamed_df.collect():
    column_to_update = combinations_dict[row.checkin_date, row.discount_code] - 1
    row_to_update = hotel_dict[row.hotel_name] - 1
    print(row_to_update,column_to_update)
    finalMat[row_to_update][column_to_update] = row.discount_price

In [None]:
for line in indexed_hotels_df.collect():
    finalMat[line.ID - 1][160] = line.hotel_name

In [None]:
mat_df = spark.createDataFrame(finalMat)

In [None]:
mat_df.show()

In [None]:
for row in range(0,150):
    curr_row_max_value = 0
    curr_row_min_value = 100000
    for col in range(0,160):
        if ((finalMat[row][col] > -1) & (finalMat[row][col] > curr_row_max_value)):
            curr_row_max_value = finalMat[row][col]
        if ((finalMat[row][col] > -1) & (finalMat[row][col] < curr_row_min_value)):
            curr_row_min_value = finalMat[row][col]
    for col in range(0,160):
        if (finalMat[row][col] > -1):
            if (curr_row_min_value < curr_row_max_value):
                finalMat[row][col] = 100 * (finalMat[row][col] - curr_row_min_value) / (curr_row_max_value - curr_row_min_value)
            else:
                finalMat[row][col] = 0

In [None]:
for row in range(0,150):
    for col in range(0,160):
        print(finalMat[row][col])

In [None]:
for row in range(0,150):
    for col in range(0,160):
        finalMat[row][col] = float(finalMat[row][col])

In [None]:
normalized_mat_df = spark.createDataFrame(finalMat)

In [None]:
normalized_mat_df.show(6)

In [None]:
# Export to file
normalized_mat_df.toPandas().to_csv('task4.csv', index=False, header=False)

In [None]:
df = spark.read.csv("task4.csv", header=True, inferSchema=True)

In [None]:
df.head()

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

feat_cols = ['_' + str(i) for i in range(1, 161)]
vec_assembler = VectorAssembler(inputCols=feat_cols, outputCol='features')
final_data = vec_assembler.transform(df)
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)