In [55]:
import pandas as pd
import numpy as np
import pyspark
from pyspark.sql import SQLContext, functions, types
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.sql import Row
from geopy.geocoders import Nominatim

In [11]:
sc=pyspark.SparkContext(appName="project")
spark = SQLContext(sc)

In [12]:
h_df = pd.read_json('dataset/hotel_info.json')
h1_df = spark.createDataFrame(h_df)
h1_df.createOrReplaceTempView('h1_df')

temp=spark.sql("SELECT df.id FROM (SELECT id, COUNT(*) as tot_count FROM h1_df GROUP BY id ORDER BY tot_count DESC) df WHERE df.tot_count>1")
temp.createOrReplaceTempView('temp')
del_dup = spark.sql("SELECT h1_df.* FROM h1_df LEFT JOIN temp ON h1_df.id == temp.id WHERE temp.id IS NULL")
del_dup.createOrReplaceTempView('del_dup')

del_dup = del_dup.withColumn("amenities", functions.split(del_dup["amenities"], ",").cast("array<string>"))
del_dup.createOrReplaceTempView('del_dup')

newh_df  = spark.sql("SELECT id,explode(amenities) as amenities FROM del_dup")
newh_df.createOrReplaceTempView('newh_df')

newh1_df  = spark.sql("SELECT amenities,COUNT(amenities) AS tot_count FROM newh_df GROUP BY amenities ORDER BY tot_count DESC")
newh1_df.createOrReplaceTempView('newh1_df')

# spark.sql("SELECT address FROM del_dup").head()
del_dup.show()

+--------------------+--------------------+-------+----------------+--------------------+------------+----+--------------------+-----+
|             address|           amenities|country|hotel_experience|          hotel_name|hotel_rating|  id|            location|price|
+--------------------+--------------------+-------+----------------+--------------------+------------+----+--------------------+-----+
|4359 Main St Whis...|                [[]]| Canada|            null|Summit Lodge Bout...|         NaN|  26|(50.1171903, -122...|  NaN|
|15 rue Sault-au-M...|[['Free High Spee...| Canada|       Excellent|     Hotel Le Priori|         4.5|  29|(52.4760892, -71....|159.0|
|4801 Chemin St-Cy...|[['Pool',  'Free ...| Canada|       Very good|Blueberry Lake Re...|         4.0| 474|(52.4760892, -71....|  NaN|
|161 Lower Ganges ...|[['Free Internet'...| Canada|         Average|Ganges Marina and...|         2.5| 964|(48.8533169, -123...|  NaN|
|760 Arthur St W T...|[['Pool',  'Free ...| Canada|    

In [13]:
amenities_pref = [" 'Non-smoking hotel'"," 'Air conditioning'"," 'Free parking'"," 'Pets Allowed ( Dog / Pet Friendly )'"," 'Free High Speed Internet (WiFi)'"]

pa_df = pd.DataFrame(amenities_pref,columns=["amenities_pref"])

a_df = spark.createDataFrame(pa_df)
a_df.createOrReplaceTempView('a_df')

newa_df  = spark.sql("SELECT * FROM newh_df INNER JOIN a_df WHERE newh_df.amenities=a_df.amenities_pref")

ameni_comb = newa_df.groupBy(functions.col("id")).agg(functions.collect_list(functions.col("amenities")).alias("amenities"))
amenities_len=ameni_comb.withColumn("ameni_len",functions.size(ameni_comb["amenities"])).orderBy(functions.col("ameni_len"), ascending=False)
amenities_len.createOrReplaceTempView("amenities_len")

ameni_df = spark.sql("SELECT a.id,h.amenities,a.ameni_len FROM del_dup h INNER JOIN amenities_len a WHERE h.id=a.id ORDER BY a.ameni_len DESC")
ameni_df.show()

+----+--------------------+---------+
|  id|           amenities|ameni_len|
+----+--------------------+---------+
|  34|[['Pool',  'Free ...|        5|
|2489|[['Pool',  'Free ...|        5|
| 155|[['Pool',  'Free ...|        5|
|4161|[['Pool',  'Free ...|        5|
|1711|[['Pool',  'Free ...|        5|
|5208|[['Pool',  'Free ...|        5|
|1882|[['Restaurant',  ...|        5|
|3061|[['Restaurant',  ...|        5|
|1055|[['Pool',  'Free ...|        5|
|5871|[['Restaurant',  ...|        5|
|  22|[['Pool',  'Room ...|        5|
|1241|[['Pool',  'Free ...|        5|
|5846|[['Pool',  'Free ...|        5|
| 385|[['Pool',  'Free ...|        5|
|5915|[['Pool',  'Room ...|        5|
|4519|[['Pool',  'Room ...|        5|
| 938|[['Restaurant',  ...|        5|
|2906|[['Pool',  'Free ...|        5|
|3069|[['Pool',  'Free ...|        5|
|1480|[['Pool',  'Free ...|        5|
+----+--------------------+---------+
only showing top 20 rows



In [14]:
def get_rating(x):
    val = x / 5
    if x >= 0 and x <= val:
        return 1
    elif x > val and x <= 2*val:
        return 2
    elif x > 2*val and x <= 3*val:
        return 3
    elif x > 3*val and x <= 4*val:
        return 4
    else:
        return 5

find_rating = functions.udf(lambda a: get_rating(a), types.IntegerType())

usr_rating = ameni_df.withColumn("rating",find_rating("ameni_len"))
# usr_rating.show()

In [15]:
df = pd.read_json('dataset/reviews.json')
# df=df.iloc[:500,:]

# df["user_id"]=df.user_name.astype("category").cat.codes
df["att_id"]=df.id.astype('category').cat.codes
# df.head()

rev_df = spark.createDataFrame(df)
rev_df.createOrReplaceTempView('rev_df')

rev_temp=spark.sql("SELECT df.id FROM (SELECT id, COUNT(*) as tot_count FROM rev_df GROUP BY id ORDER BY tot_count DESC) df WHERE df.tot_count>1")
rev_temp.createOrReplaceTempView('rev_temp')

s_df = spark.sql("SELECT rev_df.* FROM rev_df LEFT JOIN rev_temp ON rev_df.id == rev_temp.id WHERE rev_temp.id IS NULL")
s_df.createOrReplaceTempView('s_df')

# s_df.select("user_name").distinct().count()

#String Indexing user_name 
indexer = StringIndexer(inputCol="user_name", outputCol="user_id")
indexed = indexer.fit(s_df).transform(s_df)
u_id_df = indexed.withColumn("user_id",indexed["user_id"].cast("Int"))
u_id_df.createOrReplaceTempView('u_id_df')

uid_count = u_id_df.select("user_id").distinct().count()

# spark.sql("SELECT price FROM s_df").show()
# String Indexing id 
# str_in = StringIndexer(inputCol="id", outputCol="att_id",stringOrderType='frequencyAsc')
# att_indexed = str_in.fit(u_id_df).transform(u_id_df)
# att_id_df = att_indexed.withColumn("att_id",indexed["id"].cast("Int"))
# att_id_df.createOrReplaceTempView('att_id_df')

In [16]:
usrid_df = usr_rating.withColumn("usr_id", functions.lit(uid_count)).join(u_id_df.select(["id","att_id"]), "id")
# usrid_df.show()

usrid_final_df = usrid_df.select(usrid_df["usr_id"].alias("user_id"),usrid_df["att_id"].alias("att_id"),usrid_df["rating"].alias("user_rating"))

org_df = u_id_df.select("user_id","att_id","user_rating")

(usrid_s1, usrid_s2) = usrid_final_df.randomSplit([0.1,0.9])

comb_df = org_df.union(usrid_s1)
# comb_df.show()

# print(usrid_final_df.count())

In [17]:
(training,validation) = comb_df.randomSplit([0.8,0.2])
# training.show()

In [18]:
ranks=[4,8,12]
error = 20000
for i in ranks:
    als = ALS(maxIter=5,regParam=0.01,rank=i,userCol="user_id",itemCol="att_id",ratingCol="user_rating",coldStartStrategy="drop")
    model = als.fit(training)
    predictions = model.transform(validation)
    evaluator = RegressionEvaluator(metricName="rmse",labelCol="user_rating",predictionCol="prediction")
    rmse = evaluator.evaluate(predictions)
    if rmse < error:
        model.write().overwrite().save("model_file")
        print("rank : ",i)
        error = rmse        
print("RMSE:" +str(rmse))

rank :  4
rank :  8
rank :  12
RMSE:5.227490484714508


In [19]:
comb_df.createOrReplaceTempView('comb_df')
# new_df  = spark.sql("SELECT DISTINCT att_id FROM comb_df WHERE user_id != ").withColumn('user_id', functions.lit(13))

als_model = ALSModel.load("model_file")

user = usrid_s2.select("user_id").distinct()
recomm = als_model.recommendForUserSubset(user,50)
recomm.createOrReplaceTempView('recomm')

recomm_df  = spark.sql("SELECT user_id,explode(recommendations) as recommendations FROM recomm")
recomm_df.createOrReplaceTempView('recomm_df')

# als_model.recommendForUserSubset().show()
# predictions = als_model.transform(new_df)
# predictions.show()
# als_model.rank

# spark.sql("SELECT amenities FROM h1_df WHERE id=1886").head()

In [20]:
get_attid = recomm_df.withColumn("att_id",functions.col("recommendations.att_id")).withColumn("rating",functions.col("recommendations.rating"))
get_attid.createOrReplaceTempView("get_attid")

In [21]:
get_attid.show()
u_id_df.show()

+-------+-----------------+------+---------+
|user_id|  recommendations|att_id|   rating|
+-------+-----------------+------+---------+
|    111|[1008, 5.0016727]|  1008|5.0016727|
|    111| [1886, 5.000766]|  1886| 5.000766|
|    111|[1892, 5.0007486]|  1892|5.0007486|
|    111|[1776, 5.0002627]|  1776|5.0002627|
|    111| [1882, 5.000224]|  1882| 5.000224|
|    111| [4409, 4.999789]|  4409| 4.999789|
|    111| [2173, 4.999357]|  2173| 4.999357|
|    111| [2403, 4.994175]|  2403| 4.994175|
|    111| [3638, 4.994175]|  3638| 4.994175|
|    111| [4428, 4.994175]|  4428| 4.994175|
|    111| [3797, 4.994175]|  3797| 4.994175|
|    111| [4263, 4.994175]|  4263| 4.994175|
|    111| [233, 2.8649304]|   233|2.8649304|
|    111| [2539, 2.763423]|  2539| 2.763423|
|    111|[2544, 2.7600944]|  2544|2.7600944|
|    111|[4305, 2.4885278]|  4305|2.4885278|
|    111|[4299, 2.0183158]|  4299|2.0183158|
|    111|[2179, 1.9328883]|  2179|1.9328883|
|    111|  [617, 1.635478]|   617| 1.635478|
|    111|[

In [22]:
u_tempdf = spark.sql("SELECT u_id_df.id FROM u_id_df INNER JOIN get_attid on u_id_df.att_id=get_attid.att_id")
u_tempdf.createOrReplaceTempView('u_tempdf')

In [23]:
hotel_df = spark.sql("SELECT h1_df.* FROM h1_df INNER JOIN u_tempdf ON h1_df.id = u_tempdf.id")

In [24]:
hotel_df.show()
# hotel_df.createOrReplaceTempView("hotel_df")
# spark.sql("select address from hotel_df").head(10)

+--------------------+--------------------+-------+----------------+--------------------+------------+----+--------------------+-----+
|             address|           amenities|country|hotel_experience|          hotel_name|hotel_rating|  id|            location|price|
+--------------------+--------------------+-------+----------------+--------------------+------------+----+--------------------+-----+
|315 route 132 Es ...|                  []| Canada|            null|Camp de Base Coin...|         NaN|2570|(52.4760892, -71....|  NaN|
|444 chemin La Bal...|['Free parking', ...| Canada|       Very good|Hotel Cap-aux-Pie...|         3.5|5888|(52.4760892, -71....|  NaN|
|32 Rte 111 E La S...|['Restaurant', 'B...| Canada|       Very good|Motel Villa mon r...|         3.5|6043|(48.8017705, -79....|  NaN|
|650 King St E Gan...|                  []| Canada|            null|Quality Inn & Sui...|         NaN|4034|(44.3369303319149...|  NaN|
|2960 King St E Ki...|['Free parking', ...| Canada|    

In [25]:
user_location = 'ontario'
hotel_df = hotel_df.withColumn("address",functions.lower(functions.col("address")))
hotel_sugg = hotel_df.where(hotel_df.address.contains(user_location))
hotel_sugg.show()

+--------------------+--------------------+-------+----------------+--------------------+------------+----+--------------------+-----+
|             address|           amenities|country|hotel_experience|          hotel_name|hotel_rating|  id|            location|price|
+--------------------+--------------------+-------+----------------+--------------------+------------+----+--------------------+-----+
|650 king st e gan...|                  []| Canada|            null|Quality Inn & Sui...|         NaN|4034|(44.3369303319149...|  NaN|
|2960 king st e ki...|['Free parking', ...| Canada|       Very good|Radisson Hotel Ki...|         4.0|4034|(43.4292532, -80....|  NaN|
|38 lakeport rd st...|['Restaurant', 'N...| Canada|       Very good|    Harbourfront Inn|         3.5|3485|(43.202697, -79.2...|  NaN|
|western universit...|                  []| Canada|            null|Western Summer Ac...|         NaN|1815|(43.01047375, -81...|  NaN|
|1090 shamrock roa...|['Pool', 'Free pa...| Canada|    

In [64]:
prices = [i[0] for i in hotel_sugg.select("price").dropna().collect()]
if(len(prices) == 0):
    over_prices = [i[0] for i in hotel_df.select("price").dropna().collect()]
    avg_price = sum(over_prices)/len(over_prices)
else:
    avg_price = sum(prices)/len(prices)
avg_price_df = hotel_sugg.withColumn('price',functions.when(functions.isnan(functions.col("price")), functions.lit(avg_price)).otherwise(functions.col("price")))
avg_price_df.dropna()#.show()
avg_price_df.createOrReplaceTempView("avg_price_df")
spark.sql("select location from avg_price_df").head(5)
# avg_price_df.printSchema()

[Row(location='(44.3369303319149, -76.1513494617021)'),
 Row(location='(43.4292532, -80.4334451413477)'),
 Row(location='(43.202697, -79.265495)'),
 Row(location='(43.01047375, -81.2698784900773)'),
 Row(location='(42.9226806, -81.2275131)')]

In [86]:
geolocator = Nominatim(user_agent="hotel_recomm")

def get_cname(x):
    str_split = x.split(",")
    location = geolocator.reverse(str_split[0][1:]+', '+str_split[1][:-1])
    if 'city' in location.raw["address"]:
        return location.raw["address"]["city"]
    else:
        return location.raw["address"]["town"]
    
get_city = functions.udf(lambda a:get_cname(a),types.StringType())

city_df = avg_price_df.withColumn("city",get_city(functions.col("location")))

# location = geolocator.reverse("43.4292532, -80.4334451413477")
# if "city" in location.raw["address"]:
#     print (True)

In [89]:
city_df.show()

+--------------------+--------------------+-------+----------------+--------------------+------------+----+--------------------+----------------+------------------+
|             address|           amenities|country|hotel_experience|          hotel_name|hotel_rating|  id|            location|           price|              city|
+--------------------+--------------------+-------+----------------+--------------------+------------+----+--------------------+----------------+------------------+
|650 king st e gan...|                  []| Canada|            null|Quality Inn & Sui...|         NaN|4034|(44.3369303319149...|81.9090909090909|         Gananoque|
|2960 king st e ki...|['Free parking', ...| Canada|       Very good|Radisson Hotel Ki...|         4.0|4034|(43.4292532, -80....|81.9090909090909|         Kitchener|
|38 lakeport rd st...|['Restaurant', 'N...| Canada|       Very good|    Harbourfront Inn|         3.5|3485|(43.202697, -79.2...|81.9090909090909|    St. Catharines|
|western u