In [1]:
sc

In [3]:
# reading the training data.
base_df = spark.read.option("header", "true").csv("s3a://ids-2017-group42/train_ver2.csv")

In [7]:
# required import libraries.
from pyspark.mllib.stat import Statistics
from pyspark.mllib.linalg import Vectors
import numpy as np
from datetime import datetime
from pyspark.sql.functions import udf, col
from pyspark.sql.types import DateType

In [8]:
# checking null values
names = base_df.schema.names
null_count = []
for col_x in names:
    null_count.append(base_df.where(col(col_x).isNull()).count())
    
print(null_count)

In [9]:
len(base_df.schema.names)

24

In [10]:
# filtering out columns where checking and checkout dates are null
base_df = base_df.where(col('srch_ci').isNotNull()).where(col('srch_co').isNotNull())
# base_df = base_df
base_df.take(1)

[Row(date_time='2014-07-16 10:00:06', site_name='2', posa_continent='3', user_location_country='66', user_location_region='189', user_location_city='10067', orig_destination_distance=None, user_id='501', is_mobile='0', is_package='0', channel='2', srch_ci='2014-08-01', srch_co='2014-08-02', srch_adults_cnt='2', srch_children_cnt='0', srch_rm_cnt='1', srch_destination_id='8267', srch_destination_type_id='1', is_booking='0', cnt='1', hotel_continent='2', hotel_country='50', hotel_market='675', hotel_cluster='98')]

In [11]:
# converting date columns to relevant data formats.
conv_to_date = udf(lambda x: datetime.strptime(x, '%Y-%m-%d'), DateType())

base_df = base_df.withColumn("srch_ci", conv_to_date(col('srch_ci'))).withColumn("srch_co", conv_to_date(col('srch_co')))
# base_df = base_df

In [13]:
# getting a list of columns in base_df to convert to appropriate data type.
col_names = base_df.schema.names
# removing date_time column.
col_names.remove('date_time')

base_df = base_df.select(*col_names)
print(len(col_names))

23


In [14]:
# converting all data to int type.
col_names.remove('srch_ci')
col_names.remove('srch_co')

# casting columns to int.
for names in col_names:
    base_df = base_df.withColumn(names, base_df[names].cast("int"))

In [16]:
# move all the column additions here, calculate all new columns before scaling.
# Adding Season column for applying filtering.
from pyspark.sql import functions as F

def returnSeason(x):
    if x in [1, 2, 12]:
        return "0"
    elif x in [3, 4, 5]:
        return "1"
    elif x in [6, 7, 8]:
        return "2"
    elif x in [9, 10, 11]:
        return "3"
    
season_udf = udf(returnSeason, "int")

# adding column to total the number of people in the booking.
def addPeople(x1, x2):
    return x1+x2

udf_people = udf(addPeople, "int")

# adding columns to the dataframe.
base_df = base_df.withColumn('travel_season', season_udf(F.month(col('srch_ci')))) \
            .withColumn("no_persons", udf_people(col('srch_adults_cnt'), col('srch_children_cnt'))) \
            .withColumn("days_stayed", F.datediff(col('srch_co'), col('srch_ci')))

In [18]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

# dont need to scale the following columns.
col_names.remove('orig_destination_distance')
col_names.remove('hotel_cluster')
col_names.remove('user_id')
col_names.remove('is_mobile')
col_names.remove('is_booking')

def scaleColumns(base_df, col_name):
    # creating vector for column.
    col_feature = col_name + "_feature"
    assembler = VectorAssembler(
            inputCols=[col_name],
            outputCol=col_feature)
    assembled = assembler.transform(base_df)
    
    col_scaled = col_name + "_scaled"
    # creating scaled rows for the column.
    scaler = MinMaxScaler(inputCol=col_feature, outputCol=col_scaled)
    scalerModel = scaler.fit(assembled)
    base_df = scalerModel.transform(assembled)
    return base_df

In [20]:
# scaling all the columns.
for col_name in col_names:
    base_df = scaleColumns(base_df, col_name)

<IPython.core.display.Javascript object>

In [22]:
regression_assembler = VectorAssembler(inputCols=['posa_continent_scaled', 'user_location_country_scaled', 'user_location_region_scaled', 'user_location_city_scaled', 'hotel_continent_scaled', 'hotel_country_scaled', 'hotel_market_scaled'],
                                      outputCol='regression_features')

In [23]:
base_df = regression_assembler.transform(base_df)

In [25]:
regression_df = base_df.where(base_df['orig_destination_distance'].isNotNull())

In [29]:
from pyspark.ml.regression import LinearRegressionModel
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

In [60]:
# search space for all the regularization parameters.
regParam1 = [i/10 for i in range(1,10)]
regParam2 = [i/100 for i in range(1,10)]
regParam_range = regParam1 + regParam2
print(regParam_range)

[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09]


In [62]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

# linear regression model.
lr = LinearRegression(featuresCol="regression_features", labelCol="orig_destination_distance", maxIter=10, predictionCol="predicted_distance")

# constructing ml pipeline to check the best possible parameters.
pipeline1 = Pipeline(stages=[lr])

# construction paramGrid object to choose the best regularization parameters.
paramGrib = ParamGridBuilder().addGrid(lr.regParam,regParam_range).build()

# construction regression evaluator.
regEvaluator = RegressionEvaluator(predictionCol="predicted_distance", labelCol="orig_destination_distance", metricName="r2")

# cross validator.
crossVal = CrossValidator(estimator=pipeline1, estimatorParamMaps=paramGrib, evaluator=regEvaluator, numFolds=2)

In [63]:
%%time
%%notify
# fit the cross validation model to regression_df.
regModel = crossVal.fit(regression_df)
print(regModel)

CrossValidatorModel_40d7a98cd89e1ee017dc


<IPython.core.display.Javascript object>

CPU times: user 1.6 s, sys: 672 ms, total: 2.27 s
Wall time: 2h 9min 26s


In [72]:
## getting the best model from the cross validator model.
regPipeline = regModel.bestModel

In [76]:
# saving the best model trained for further use.
regPipeline.stages[0].save("LinearRegressionModel")

In [30]:
# reloading the model, if it is trained.
regModel1 = LinearRegressionModel.load("LinearRegressionModel")

In [31]:
%%time
# doing regression on the data.
base_df = regModel1.transform(base_df)

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 38.9 ms


In [33]:
%%time
# filtering df to evaluate error values.
base_df_reg = base_df.where(base_df['orig_destination_distance'].isNotNull())

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 18.8 ms


In [35]:
%%time
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

# new regression model -> Random Forest Regression.
rf_model = RandomForestRegressor(featuresCol="regression_features", labelCol="orig_destination_distance", predictionCol="predicted_distance_rf")

# creating a pipeline for random forest regression.
pipeline2 = Pipeline(stages=[rf_model])
# pipeline2 = rf_model

# building parameters for random forest to choose from.
numTrees_range = [i for i in range(2,3)] # increase these parameters when training on the bigger instance.
maxDepth_range = [i for i in range(5,6)] # increase these parameters when training on the bigger instance.

# construction paramGrid object to choose the best parameters for random forest.
paramGrid2 = ParamGridBuilder().addGrid(rf_model.numTrees,numTrees_range).addGrid(rf_model.maxDepth, maxDepth_range).build()

# construction regression evaluator.
regEvaluator2 = RegressionEvaluator(predictionCol="predicted_distance_rf", labelCol="orig_destination_distance", metricName="r2")

# cross validator.
crossVal2 = CrossValidator(estimator=pipeline2, estimatorParamMaps=paramGrid2, evaluator=regEvaluator2, numFolds=2)

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 12.4 ms


In [36]:
%%time
%%notify
# fit the cross validation model to regression_df.
rf_model = crossVal2.fit(regression_df)

<IPython.core.display.Javascript object>

CPU times: user 128 ms, sys: 20 ms, total: 148 ms
Wall time: 7min 38s


In [37]:
# save the Random forest model.
regPipeline2 = rf_model.bestModel

# saving the best model trained for further use.
regPipeline2.stages[0].save("RandomForestModel")

In [38]:
from pyspark.ml.regression import RandomForestRegressionModel

# load from memory
rf_model1 = RandomForestRegressionModel.load("RandomForestModel")

In [39]:
%%time
base_df_rf = rf_model1.transform(base_df)
base_df_rf = base_df_rf.where(base_df_rf['orig_destination_distance'].isNotNull())

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 59 ms


In [40]:
# print the model evaluation results. - random forest model.
print(regEvaluator2.evaluate(base_df_rf))

0.7012087821315416


In [138]:
row5_df_assembler = VectorAssembler(
        inputCols=['posa_continent_scaled','site_name_scaled','user_location_country_scaled','user_location_region_scaled','user_location_city_scaled', 'user_id'],
        outputCol="kmeans_features")

# apply filtering to reduce get unique values of user features using drop duplicates.
clustering_df = row5_df_assembler.transform(base_df) \
    .select(['user_id', 'kmeans_features']) \
    .dropDuplicates()

In [139]:
clustering_df.take(1)

[Row(user_id=9681, kmeans_features=DenseVector([0.25, 0.2157, 0.1925, 0.4012, 0.6253, 9681.0]))]

In [135]:
# to choose a good value for clustering.
k_value = base_df.select('user_location_region').distinct().count()

In [150]:
%%notify
from pyspark.ml.clustering import KMeans

# kmeans model.
kmeans = KMeans(featuresCol="kmeans_features", predictionCol="predicted_clusters",initMode="k-means||", initSteps=2,
                tol=1e-4,maxIter=20, k=15000, seed=27)
kmodel = kmeans.fit(clustering_df)
wsse = kmodel.computeCost(clustering_df)

<IPython.core.display.Javascript object>

## Training the recommendation model.

In [43]:
base_df.printSchema()

root
 |-- site_name: integer (nullable = true)
 |-- posa_continent: integer (nullable = true)
 |-- user_location_country: integer (nullable = true)
 |-- user_location_region: integer (nullable = true)
 |-- user_location_city: integer (nullable = true)
 |-- orig_destination_distance: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- is_mobile: integer (nullable = true)
 |-- is_package: integer (nullable = true)
 |-- channel: integer (nullable = true)
 |-- srch_ci: date (nullable = true)
 |-- srch_co: date (nullable = true)
 |-- srch_adults_cnt: integer (nullable = true)
 |-- srch_children_cnt: integer (nullable = true)
 |-- srch_rm_cnt: integer (nullable = true)
 |-- srch_destination_id: integer (nullable = true)
 |-- srch_destination_type_id: integer (nullable = true)
 |-- is_booking: integer (nullable = true)
 |-- cnt: integer (nullable = true)
 |-- hotel_continent: integer (nullable = true)
 |-- hotel_country: integer (nullable = true)
 |-- hotel_market: integer 

In [44]:
# converting to dataframe to rdd, ml library doesnt have ALS factorization method.
new_df = base_df.select('user_id', 'hotel_cluster', 'predicted_distance', 'is_booking', 'cnt', 'travel_season')

In [45]:
new_df = scaleColumns(new_df, 'predicted_distance')

In [47]:
# converting the selected df to rdd.
recommendation_rdd = new_df.select('user_id', 'hotel_cluster', 'predicted_distance_scaled', 'is_booking', 'cnt').rdd

In [49]:
# mapping the rdd to key-value pairs.
recommendation_rdd = recommendation_rdd.map(lambda x: ((x[0], x[1]) , (x[2].values[0], x[3:])))
recommendation_rdd.take(1)

[((501, 98), (0.36599233403023351, (0, 1)))]

In [1]:
## Transforming the rdd to use as input for colloborative filtering

In [50]:
recommendation_rdd = recommendation_rdd.map(lambda x: (x[0], (x[1][0], x[1][1][0], x[1][1][1])))
recommendation_rdd.take(1)

[((501, 98), (0.36599233403023351, 0, 1))]

In [51]:
count_dict = recommendation_rdd.map(lambda x: x[0]).countByValue()
count_broadcast = sc.broadcast(count_dict)

In [53]:
rec_rdd_1 = recommendation_rdd.reduceByKey(lambda x, y: x+y)
rec_rdd_1.take(1)

[((92893, 17), (0.40782858329157284, 0, 1))]

In [54]:
# verifying that the transformation take place.
rec_rdd_2 = rec_rdd_1.filter(lambda x: len(x[1]) > 3 and len(x[1])%3 != 0)
rec_rdd_2.take(1)

[]

In [55]:
def combineRatings(x):
    if len(x[1]) > 3:
        length = len(x[1])
        index = 0
        count = 0
        dist, is_b, srch_cnt = 0, 1, 0
        while length != index:
            dist += x[1][index]
            index += 1
            is_b *= x[1][index]
            index += 1
            srch_cnt += x[1][index]
            index += 1
            count += 1
        return (x[0], (1 - (dist/count), is_b, srch_cnt))
    else:
        return x

In [56]:
rec_rdd_3 = rec_rdd_1.map(combineRatings)
rec_rdd_3.take(1)

[((92893, 17), (0.40782858329157284, 0, 1))]

In [57]:
rec_rdd_2 = rec_rdd_3.filter(lambda x: len(x[1]) == 3)
rec_rdd_2.count()

1679826

In [60]:
rec_rdd4 = rec_rdd_3.map(lambda x: (x[0], x[1], count_broadcast.value[x[0]]))

In [62]:
rec_rdd5 = rec_rdd4.map(lambda x: (x[0], (x[1][0], x[1][1], x[1][2], x[2])))
rec_rdd5.take(1)

[((92893, 17), (0.40782858329157284, 0, 1, 1))]

In [63]:
ratings_rdd = rec_rdd5.map(lambda x: (x[0], x[1][0] * (x[1][1] + x[1][2] + x[1][3])))
ratings_rdd.take(1)

[((92893, 17), 0.81565716658314569)]

In [65]:
from pyspark.mllib.recommendation import Rating

ratings_rdd = ratings_rdd.map(lambda x: Rating(x[0][0], x[0][1], x[1]))
ratings_rdd.take(1)

[Rating(user=92893, product=17, rating=0.8156571665831457)]

In [66]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel

# Build the recommendation model using Alternating Least Squares
rank_list = [i for i in range(10, 55, 5)]
num_list = [i for i in range(10, 22, 2)]
alpha_list = [i/10 for i in range(1, 10)]
model_list = {}
error_list = {}

print(rank_list)
print(num_list)
print(alpha_list)

[10, 15, 20, 25, 30, 35, 40, 45, 50]
[10, 12, 14, 16, 18, 20]
[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]


In [69]:
ratings = ratings_rdd
testdata = ratings.map(lambda p: (p[0], p[1]))

# recommendation model -> used for finding out the best hyper parameters.
# for rank in rank_list:
#     for numIterations in num_list:
#         for a in alpha_list:
#             rec_model = ALS.trainImplicit(ratings_rdd, rank, numIterations, alpha=a)
#             x = str(rank) + "-" + str(numIterations) + "-" + str(a) + "-model"
            
#             predictions = rec_model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
#             ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
#             MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
#             error_list[x] = MSE
#             del rec_model

# once optimal hyperparameters were known, we used those to train further models.
rec_model = ALS.trainImplicit(ratings_rdd, 10, 12, alpha=0.01)
predictions = rec_model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings_rdd.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()

In [70]:
print(MSE)

2.32202371917


In [72]:
#saving the recommendation model.
rec_model.save(sc, "RecommendationModel")

In [73]:
# load model back, no need to retrain the model.
rec_model1 = MatrixFactorizationModel.load(sc, "RecommendationModel")

## Generating Test File

In [71]:
# reading test data.
test_df = spark.read.option("header", "true").csv("s3a://ids-2017-group42/test.csv")

In [74]:
test_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- date_time: string (nullable = true)
 |-- site_name: string (nullable = true)
 |-- posa_continent: string (nullable = true)
 |-- user_location_country: string (nullable = true)
 |-- user_location_region: string (nullable = true)
 |-- user_location_city: string (nullable = true)
 |-- orig_destination_distance: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- is_mobile: string (nullable = true)
 |-- is_package: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- srch_ci: string (nullable = true)
 |-- srch_co: string (nullable = true)
 |-- srch_adults_cnt: string (nullable = true)
 |-- srch_children_cnt: string (nullable = true)
 |-- srch_rm_cnt: string (nullable = true)
 |-- srch_destination_id: string (nullable = true)
 |-- srch_destination_type_id: string (nullable = true)
 |-- hotel_continent: string (nullable = true)
 |-- hotel_country: string (nullable = true)
 |-- hotel_market: string (nullable = true)


In [75]:
# applying same preprocessing transformation that were applied to base df.
test_df = test_df.withColumn("srch_ci", conv_to_date(col('srch_ci'))).withColumn("srch_co", conv_to_date(col('srch_co')))

In [78]:
col_names = test_df.schema.names
print(col_names)

['id', 'site_name', 'posa_continent', 'user_location_country', 'user_location_region', 'user_location_city', 'orig_destination_distance', 'user_id', 'is_mobile', 'is_package', 'channel', 'srch_ci', 'srch_co', 'srch_adults_cnt', 'srch_children_cnt', 'srch_rm_cnt', 'srch_destination_id', 'srch_destination_type_id', 'hotel_continent', 'hotel_country', 'hotel_market']


In [83]:
col_names.remove('date_time')
# # names_df.remove('BookingDate')
test_df = test_df.select(*col_names)

In [85]:
col_names.remove('srch_ci')
col_names.remove('srch_co')

for names in col_names:
    test_df = test_df.withColumn(names, test_df[names].cast("int"))

In [89]:
test_df = test_df.withColumn('travel_season', season_udf(F.month(col('srch_ci')))) \
    .withColumn("no_persons", udf_people(col('srch_adults_cnt'), col('srch_children_cnt'))) \
    .withColumn("days_stayed", F.datediff(col('srch_co'), col('srch_ci')))

In [92]:
# dont need to scale the following columns.
col_names.remove('orig_destination_distance')
col_names.remove('user_id')
col_names.remove('is_mobile')
col_names.remove('is_booking')

def scaleColumns(base_df, col_name):
    # creating vector for column.
    col_feature = col_name + "_feature"
    assembler = VectorAssembler(
            inputCols=[col_name],
            outputCol=col_feature)
    assembled = assembler.transform(base_df)
    
    col_scaled = col_name + "_scaled"
    # creating scaled rows for the column.
    scaler = MinMaxScaler(inputCol=col_feature, outputCol=col_scaled)
    scalerModel = scaler.fit(assembled)
    base_df = scalerModel.transform(assembled)
    return base_df

In [95]:
%%notify
# scaling all the columns.
for col_name in col_names:
    test_df = scaleColumns(test_df, col_name)

<IPython.core.display.Javascript object>

In [96]:
# transformations for running regression.
test_df = regression_assembler.transform(test_df)

In [97]:
# using the pretrained model to estimate test distances.
test_df = regModel1.transform(test_df)

In [104]:
new_test_df = test_df.select('id','user_id')

In [117]:
eval_ratings = new_test_df.rdd

In [113]:
# generating recommendation for all users.
rec_ratings = rec_model1.recommendProductsForUsers(5)

In [123]:
rec_ratings1 = rec_ratings.map(lambda x: (x[0], (x[1][0].product, x[1][1].product, x[1][2].product, x[1][3].product, x[1][4].product)))
rec_ratings1.take(1)

[(318784, (41, 5, 70, 25, 98))]

In [182]:
final_rdd = eval_ratings.join(rec_ratings1)

In [184]:
final_rdd.filter(lambda x: x==None).count()

0

In [186]:
final_rdd = final_rdd.map(lambda x: (x[0], x[1][0], x[1][1]))
final_rdd.take(1)

[(0, 1, (21, 98, 18, 95, 70))]

In [191]:
final_rdd = final_rdd.map(lambda x: (x[1], x[0], x[2]))

In [192]:
# id, user_id, cluster recommendation.
final_rdd.take(1)

[(1, 0, (21, 98, 18, 95, 70))]

In [203]:
# verifying all users have recommendation.
final_rdd_n.filter(lambda x: x[2] == None).count()

0

In [204]:
final_rdd = final_rdd_n.map(lambda x: x)

In [205]:
final_rdd1 = final_rdd.map(lambda x: (x[1], ' '.join(str(d) for d in x[2])))
final_rdd1.take(1)

[(0, '21 98 18 95 70')]

In [206]:
final_rdd2 = final_rdd1.map(lambda x: (str(x[0]), x[1]))
final_rdd2.take(1)

[('0', '21 98 18 95 70')]

In [207]:
final_rdd3 = final_rdd2.map(lambda x: ','.join(i for i in x))
final_rdd3.take(1)

['0,21 98 18 95 70']

In [208]:
final_rdd2.take(1)

[('0', '21 98 18 95 70')]

In [215]:
write_df = final_rdd2.toDF(['id', 'hotel_cluster'])

In [216]:
write_df.take(1)

[Row(id='0', hotel_cluster='21 98 18 95 70')]

In [222]:
write_df.coalesce(1).write.option("header","true").csv('s3a://ids-2017-group42/result1.csv')

## >>>end