-
Notifications
You must be signed in to change notification settings - Fork 0
/
Spark Code - Vacation Planner
311 lines (205 loc) · 14.3 KB
/
Spark Code - Vacation Planner
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
%pyspark
# Loading the ratings dataset into spark
# This dataset contains multiple hotels ratings by each user.
person_hotel = spark.read.format("csv").options(header="true", infeschema="true").load("project/person_hotel_long_new_two.csv")
person_hotel = person_hotel.drop('_c0')
z.show(person_hotel)
%pyspark
# Categorizing the ratings as binary - any rating > 3 is considered to be a like and is given a value = 1 and any rating 3 or below 3 is considered not like and given value = 0. This assumption is made for simplicity reasons
person_hotel.createOrReplaceTempView("person_hotel")
person_hotel = spark.sql("SELECT *, case when User_rating > 3 then 1 else 0 end as User_rating_summ FROM person_hotel")
%pyspark
# Converting the schema to interger to perform future calculations and the ALS model also rquires all the values to be integer type to predict
from pyspark.sql.types import IntegerType
person_hotel=person_hotel.withColumn("Person_ID", person_hotel["Person_ID"].cast(IntegerType()))
person_hotel=person_hotel.withColumn("Hotel_ID", person_hotel["Hotel_ID"].cast(IntegerType()))
person_hotel=person_hotel.withColumn("User_rating_summ", person_hotel["User_rating_summ"].cast(IntegerType()))
%pyspark
# Checking the DataFrame data types
person_hotel.dtypes
%pyspark
# First implementation of the algorith on the available data - this is implemented to test the performance of the model
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
(training, test) = person_hotel.randomSplit([0.8, 0.2])
# Build the recommendation model using ALS on the training data
als = ALS(maxIter=5, regParam=0.01, userCol="Person_ID", itemCol="Hotel_ID", ratingCol="User_rating_summ")
model = als.fit(training)
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="User_rating_summ",
predictionCol="prediction")
rmse = evaluator.evaluate(predictions.filter(predictions.prediction != 'NaN'))
print("Root-mean-square error = " + str(rmse))
# Generate top 10 movie recommendations for each user
#userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
# movieRecs = model.recommendForAllItems(10)
%pyspark
#QC check to see the number of rows in predictions table
predictions.createOrReplaceTempView("predictions")
spark.sql("select * from predictions ").count()
%pyspark
#First implementation of the model on a new user.
#Importing review files for the user who want suggestions/recommendations
test_inp = spark.read.format("csv").options(header="true", infeschema="true").load("project/test.csv")
z.show(test_inp)
%pyspark
# Converting the schema to interger to perform future calculations and the ALS model also rquires all the values to be integer type to predict
test_inp = test_inp.withColumn("Person_ID", test_inp["Person_ID"].cast(IntegerType()))
test_inp = test_inp.withColumn("Hotel_ID", test_inp["Hotel_ID"].cast(IntegerType()))
test_inp = test_inp.withColumn("User_rating_summ", test_inp["User_rating_summ"].cast(IntegerType()))
%pyspark
# Merging the new users data with the ratings data that we already have in our system - this will be used to train the model
person_hotel.createOrReplaceTempView("person_hotel_1")
test_inp.createOrReplaceTempView("test_inp")
person_hotel_model = spark.sql("select * from person_hotel_1 union select Person_ID, Hotel_ID, 1, 1, User_rating_summ from test_inp")
%pyspark
# To calculate top 10 recommendations for the user 10001 based on their ratings, we create a new dataframe for the user with unrated hotels. When this is passed to the model, it provides a prediction score for each unrated hotel for the new user 10001
test_predictions = spark.read.format("csv").options(header="true", infeschema="true").load("project/test_predictions.csv")
test_predictions.createOrReplaceTempView("test_predictions_1")
test_predictions_reco = spark.sql("select a.* from test_predictions_1 a left join test_inp b on a.Hotel_ID = b.Hotel_ID where b.Hotel_ID is null")
test_predictions_reco.show()
%pyspark
# Similar conversion of data types to integer for future calculations and working of model
test_predictions_reco = test_predictions_reco.withColumn("Person_ID", test_predictions_reco["Person_ID"].cast(IntegerType()))
test_predictions_reco = test_predictions_reco.withColumn("Hotel_ID", test_predictions_reco["Hotel_ID"].cast(IntegerType()))
test_predictions_reco = test_predictions_reco.withColumn("User_rating_summ", test_predictions_reco["User_rating_summ"].cast(IntegerType()))
%pyspark
# Importing hotel schema data
HOTEL_SCHEMA = spark.read.format("csv").options(header="true", inferschema="true").load("project/hotel_schema_new.csv")
HOTEL_SCHEMA = HOTEL_SCHEMA.drop('Unnamed: 0')
z.show(HOTEL_SCHEMA)
%pyspark
from pyspark.sql.types import IntegerType
HOTEL_SCHEMA = HOTEL_SCHEMA.withColumn("HOTEL_ID_NEW", HOTEL_SCHEMA["HOTEL_ID_NEW"].cast(IntegerType()))
%pyspark
# Observing the initial input ratings from user 10001.
HOTEL_SCHEMA.createOrReplaceTempView("HOTEL_SCHEMA_1")
predictions.createOrReplaceTempView("predictions")
spark.sql("SELECT A.*, B.NAME, B.CITY, B.STATE, B.NEW_CATEGORIES, B.Type FROM test_inp A INNER JOIN HOTEL_SCHEMA_1 B ON A.Hotel_ID = B.HOTEL_ID_NEW").show()
%pyspark
# Creating a function to easily run the model on future test data
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
def ALS_recom(train_data, test_data):
# Build the recommendation model using ALS on the training data
als = ALS(maxIter=5, regParam=0.01, userCol="Person_ID", itemCol="Hotel_ID", ratingCol="User_rating_summ")
model = als.fit(train_data)
predictions = model.transform(test_data)
return predictions
%pyspark
# Creating a function to provide top 10 recommendations on the model output from the above function
def top10_reco(predictions):
HOTEL_SCHEMA.createOrReplaceTempView("HOTEL_SCHEMA_1")
predictions.createOrReplaceTempView("predictions")
return spark.sql("SELECT A.*, B.NAME, B.CITY, B.STATE, B.NEW_CATEGORIES, B.Type FROM predictions A INNER JOIN HOTEL_SCHEMA_1 B ON A.Hotel_ID = B.HOTEL_ID_NEW order by prediction desc limit 10")
%pyspark
# INPUT
# Preparing and viewing initial new user data on a
HOTEL_SCHEMA.createOrReplaceTempView("HOTEL_SCHEMA_1")
spark.sql("SELECT A.*, B.NAME, B.CITY, B.STATE, B.Type FROM test_inp A INNER JOIN HOTEL_SCHEMA_1 B ON A.Hotel_ID = B.HOTEL_ID_NEW").show()
%pyspark
# OUTPUT
# Running the functions on the above data to produce suggestions.
pred = ALS_recom(person_hotel_model, test_predictions_reco)
top10_pred = top10_reco(pred)
top10_pred.select('Person_ID', 'Hotel_ID', 'prediction', 'NAME', 'CITY', 'STATE', 'Type').show()
%pyspark
# Importing airfare data
airfares = spark.read.format("csv").options(header="true", inferschema="true").load("project/air_raw.csv")
airfares = airfares.filter(airfares.fare.isNotNull())
airfares_recent = airfares.filter(airfares.Year.between(2012, 2017))
z.show(airfares_recent)
%pyspark
# converting fares from string to floats
import pyspark.sql.functions as F
split_fares = F.split(airfares_recent['fare'], '\\$')
airfares_recent = airfares_recent.withColumn('fare_num', split_fares.getItem(1).cast('float'))
%pyspark
# Using 2012 to 2017 as the fares are very similar
airfares_recent.groupBy('Year').avg('fare_num').show()
%pyspark
# Verifying the integrity of the data
airfares_recent.groupBy('Year').count().show()
%pyspark
# Splitting the strings in the airfare data to seperate cities states and extra information
# departures
split_dep = F.split(airfares_recent['city1'], ',')
airfares_recent = airfares_recent.withColumn('dep_city', split_dep.getItem(0))
split_dep_st = F.split(split_dep.getItem(1), ' ')
airfares_recent = airfares_recent.withColumn('dep_state', split_dep_st.getItem(1))
# arrivals
split_arr = F.split(airfares_recent['city2'], ',')
airfares_recent = airfares_recent.withColumn('arr_city', split_arr.getItem(0))
split_arr_st = F.split(split_arr.getItem(1), ' ')
airfares_recent = airfares_recent.withColumn('arr_state', split_arr_st.getItem(1))
z.show(airfares_recent)
%pyspark
# Importing customer master data
demog = spark.read.format("csv").options(header="true", inferschema="true").load("project/demog_data.csv")
z.show(demog)
%pyspark
# Joining customer master data with the predictions on user ID to get multiple information like user information, departure and arrival locations, etc
person_pred = demog.join(top10_pred.withColumnRenamed('NAME', 'HOTEL_NAME').withColumnRenamed('CITY', 'HOTEL_CITY').withColumnRenamed('STATE', 'HOTEL_STATE'), demog.ID == top10_pred.Person_ID, 'inner')
%pyspark
# Consolidating the final output by joining all the tables and querying relavent information like predictions, sorting by scores, and providing air prices and distance.
top10_fares = person_pred.join(airfares_recent, (person_pred.HOTEL_STATE == airfares_recent.arr_state) & (person_pred.STATE == airfares_recent.dep_state), 'left').dropDuplicates(['Hotel_ID'])
top10_fares.createOrReplaceTempView("top10_fares")
top10_pred_final = spark.sql("SELECT Hotel_ID, HOTEL_NAME, HOTEL_CITY, HOTEL_STATE, prediction, Type, case when fare_num is null then 'No Data' else fare_num end AS Fare, case when nsmiles is null then 'No Data' else nsmiles end AS Distance FROM top10_fares order by prediction desc")
top10_pred_final.show()
#z.show(top10_fares.select('Person_ID', 'Hotel_ID', 'prediction', 'HOTEL_NAME', 'HOTEL_CITY', 'HOTEL_STATE', 'Type', 'fare_num', 'nsmiles', 'passengers'))
%pyspark
# For beaches
print("User Information:")
demog.filter(demog.ID == '10001').show()
print("User supplied ratings:")
test_inp = spark.read.format("csv").options(header="true", infeschema="true").load("project/test-2.csv")
test_inp = test_inp.withColumn("Person_ID", test_inp["Person_ID"].cast(IntegerType()))
test_inp = test_inp.withColumn("Hotel_ID", test_inp["Hotel_ID"].cast(IntegerType()))
test_inp = test_inp.withColumn("User_rating_summ", test_inp["User_rating_summ"].cast(IntegerType()))
test_inp.createOrReplaceTempView("test_inp")
spark.sql("SELECT A.*, B.NAME, B.CITY, B.STATE, B.Type FROM test_inp A INNER JOIN HOTEL_SCHEMA_1 B ON A.Hotel_ID = B.HOTEL_ID_NEW").show()
print("Predictions for the user:")
person_hotel_model = spark.sql("select * from person_hotel_1 union select Person_ID, Hotel_ID, 1, 1, User_rating_summ from test_inp")
test_predictions = spark.read.format("csv").options(header="true", infeschema="true").load("project/test_predictions-2.csv")
test_predictions.createOrReplaceTempView("test_predictions_2")
test_predictions_reco = spark.sql("select a.* from test_predictions_2 a left join test_inp b on a.Hotel_ID = b.Hotel_ID where b.Hotel_ID is null")
test_predictions_reco = test_predictions_reco.withColumn("Person_ID", test_predictions_reco["Person_ID"].cast(IntegerType()))
test_predictions_reco = test_predictions_reco.withColumn("Hotel_ID", test_predictions_reco["Hotel_ID"].cast(IntegerType()))
test_predictions_reco = test_predictions_reco.withColumn("User_rating_summ", test_predictions_reco["User_rating_summ"].cast(IntegerType()))
pred = ALS_recom(person_hotel_model, test_predictions_reco)
top10_pred = top10_reco(pred)
person_pred = demog.join(top10_pred.withColumnRenamed('NAME', 'HOTEL_NAME').withColumnRenamed('CITY', 'HOTEL_CITY').withColumnRenamed('STATE', 'HOTEL_STATE'), demog.ID == top10_pred.Person_ID, 'inner')
top10_fares = person_pred.join(airfares_recent, (person_pred.HOTEL_STATE == airfares_recent.arr_state) & (person_pred.STATE == airfares_recent.dep_state), 'left').dropDuplicates(['Hotel_ID'])
top10_fares.createOrReplaceTempView("top10_fares")
top10_pred_final = spark.sql("SELECT Hotel_ID, HOTEL_NAME, HOTEL_CITY, HOTEL_STATE, prediction, Type, case when fare_num is null then 'No Data' else fare_num end AS Fare, case when nsmiles is null then 'No Data' else nsmiles end AS Distance FROM top10_fares order by prediction desc")
top10_pred_final.show()
%pyspark
# For ski
print("User Information:")
demog.filter(demog.ID == '10002').show()
print("User supplied ratings:")
test_inp = spark.read.format("csv").options(header="true", infeschema="true").load("project/test-3.csv")
test_inp = test_inp.withColumn("Person_ID", test_inp["Person_ID"].cast(IntegerType()))
test_inp = test_inp.withColumn("Hotel_ID", test_inp["Hotel_ID"].cast(IntegerType()))
test_inp = test_inp.withColumn("User_rating_summ", test_inp["User_rating_summ"].cast(IntegerType()))
test_inp.createOrReplaceTempView("test_inp")
spark.sql("SELECT A.*, B.NAME, B.CITY, B.STATE, B.Type FROM test_inp A INNER JOIN HOTEL_SCHEMA_1 B ON A.Hotel_ID = B.HOTEL_ID_NEW").show()
print("Predictions for the User:")
person_hotel_model = spark.sql("select * from person_hotel_1 union select Person_ID, Hotel_ID, 1, 1, User_rating_summ from test_inp")
test_predictions = spark.read.format("csv").options(header="true", infeschema="true").load("project/test_predictions-3.csv")
test_predictions.createOrReplaceTempView("test_predictions_3")
test_predictions_reco = spark.sql("select a.* from test_predictions_3 a left join test_inp b on a.Hotel_ID = b.Hotel_ID where b.Hotel_ID is null")
test_predictions_reco = test_predictions_reco.withColumn("Person_ID", test_predictions_reco["Person_ID"].cast(IntegerType()))
test_predictions_reco = test_predictions_reco.withColumn("Hotel_ID", test_predictions_reco["Hotel_ID"].cast(IntegerType()))
test_predictions_reco = test_predictions_reco.withColumn("User_rating_summ", test_predictions_reco["User_rating_summ"].cast(IntegerType()))
pred = ALS_recom(person_hotel_model, test_predictions_reco)
top10_pred = top10_reco(pred)
person_pred = demog.join(top10_pred.withColumnRenamed('NAME', 'HOTEL_NAME').withColumnRenamed('CITY', 'HOTEL_CITY').withColumnRenamed('STATE', 'HOTEL_STATE'), demog.ID == top10_pred.Person_ID, 'inner')
top10_fares = person_pred.join(airfares_recent, (person_pred.HOTEL_STATE == airfares_recent.arr_state) & (person_pred.STATE == airfares_recent.dep_state), 'left').dropDuplicates(['Hotel_ID'])
top10_fares.createOrReplaceTempView("top10_fares")
top10_pred_final = spark.sql("SELECT Hotel_ID, HOTEL_NAME, HOTEL_CITY, HOTEL_STATE, prediction, Type, case when fare_num is null then 'No Data' else fare_num end AS Fare, case when nsmiles is null then 'No Data' else nsmiles end AS Distance FROM top10_fares order by prediction desc")
top10_pred_final.show()