Stay Recommendations:
Machine Learning model training and prediction

In [75]:
import findspark
findspark.init("C:/ProgramData/Anaconda3/Lib/site-packages/pyspark/")

In [76]:
from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .appName("Final_project")\
        .getOrCreate()
spark

In [77]:
sc = spark.sparkContext
sc

In [78]:
# import some packages from the spark.sql,spark.ml to deal with data types and schemas
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import substring, length, col, expr,asc, desc, col, struct, monotonically_increasing_id, mean, udf
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [79]:
#Read files into into dataframe
df_all = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema","true") \
    .load("Cleaned_ETL_for_ML.csv")
df_all.show(5)
#df_b comtains everythin, all of data for ML algorithm to be run
#This will load the data file stored for a particular city (here New York) for training

+---+----------------+-----------+--------+----------------------+----------------------------+----------------+-----------------+-----+-----------------+--------------------+
|_c0|count_restaurant|count_crime|      id|neighbourhood_cleansed|neighbourhood_group_cleansed|Latitude_listing|Longitude_listing|price|number_of_reviews|review_scores_rating|
+---+----------------+-----------+--------+----------------------+----------------------------+----------------+-----------------+-----+-----------------+--------------------+
|  0|             987|         37|10034090|          Williamsburg|                    Brooklyn|        40.71049|        -73.94515|   40|                1|               100.0|
|  1|            4375|         99| 1022204|       Lower East Side|                   Manhattan|        40.72198|        -73.98932|  110|               49|                98.0|
|  2|             686|         24|10234090|           Sunset Park|                    Brooklyn|         40.6642|        

In [80]:
df_all.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- count_restaurant: integer (nullable = true)
 |-- count_crime: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- neighbourhood_cleansed: string (nullable = true)
 |-- neighbourhood_group_cleansed: string (nullable = true)
 |-- Latitude_listing: double (nullable = true)
 |-- Longitude_listing: double (nullable = true)
 |-- price: integer (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- review_scores_rating: double (nullable = true)



In [81]:
#Selecting the required columns i.e count_restaurant,count_crime,price,review_scores_rating for training 
#use_data=df_b.select("count_restaurant","count_crime","price","review_scores_rating")

#This part was shifted to pre processing stage
#train_use_data=train_use_data.withColumn("price",expr("substring(price, 2, length(price))").cast("float"))

#Review score rating have been c
df_all=df_all.withColumn("review_scores_rating",(df_all["review_scores_rating"]/5).cast("int")).filter(df_all.price.isNotNull())
df_all.show(5)

+---+----------------+-----------+--------+----------------------+----------------------------+----------------+-----------------+-----+-----------------+--------------------+
|_c0|count_restaurant|count_crime|      id|neighbourhood_cleansed|neighbourhood_group_cleansed|Latitude_listing|Longitude_listing|price|number_of_reviews|review_scores_rating|
+---+----------------+-----------+--------+----------------------+----------------------------+----------------+-----------------+-----+-----------------+--------------------+
|  0|             987|         37|10034090|          Williamsburg|                    Brooklyn|        40.71049|        -73.94515|   40|                1|                  20|
|  1|            4375|         99| 1022204|       Lower East Side|                   Manhattan|        40.72198|        -73.98932|  110|               49|                  19|
|  2|             686|         24|10234090|           Sunset Park|                    Brooklyn|         40.6642|        

In [82]:

train_use_data=df_all.where(col("review_scores_rating").isNotNull())

#We will scale the data in various columns/attributes so that attributes have equal impact on the model parameters
#df=train_use_data
train_use_data.show(5)
# UDF for converting column type from vector to double type
unlist = udf(lambda x: round(float(list(x)[0]),3), DoubleType())

# Iterating over columns to be scaled
for i in ["count_restaurant","count_crime","price"]:
    # VectorAssembler Transformation - Converting column to vector type
    assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")

    # MinMaxScaler Transformation
    scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")

    # Pipeline of VectorAssembler and MinMaxScaler
    pipeline = Pipeline(stages=[assembler, scaler])

    # Fitting pipeline on dataframe
    train_use_data = pipeline.fit(train_use_data).transform(train_use_data).withColumn(i,unlist(i+"_Scaled")).drop(i+"_Vect",i+"_Scaled")

print("After Scaling :")
train_use_data.show(5)

+---+----------------+-----------+--------+----------------------+----------------------------+----------------+-----------------+-----+-----------------+--------------------+
|_c0|count_restaurant|count_crime|      id|neighbourhood_cleansed|neighbourhood_group_cleansed|Latitude_listing|Longitude_listing|price|number_of_reviews|review_scores_rating|
+---+----------------+-----------+--------+----------------------+----------------------------+----------------+-----------------+-----+-----------------+--------------------+
|  0|             987|         37|10034090|          Williamsburg|                    Brooklyn|        40.71049|        -73.94515|   40|                1|                  20|
|  1|            4375|         99| 1022204|       Lower East Side|                   Manhattan|        40.72198|        -73.98932|  110|               49|                  19|
|  2|             686|         24|10234090|           Sunset Park|                    Brooklyn|         40.6642|        

In [83]:
#split data for model training and testing
a,b=train_use_data.randomSplit([0.75, 0.25],seed=5)
a.show(2)

+---+----------------+-----------+--------+----------------------+----------------------------+----------------+-----------------+-----+-----------------+--------------------+
|_c0|count_restaurant|count_crime|      id|neighbourhood_cleansed|neighbourhood_group_cleansed|Latitude_listing|Longitude_listing|price|number_of_reviews|review_scores_rating|
+---+----------------+-----------+--------+----------------------+----------------------------+----------------+-----------------+-----+-----------------+--------------------+
|  0|            0.13|      0.121|10034090|          Williamsburg|                    Brooklyn|        40.71049|        -73.94515| 0.04|                1|                  20|
|  1|           0.578|       0.33| 1022204|       Lower East Side|                   Manhattan|        40.72198|        -73.98932| 0.11|               49|                  19|
+---+----------------+-----------+--------+----------------------+----------------------------+----------------+--------

In [84]:

assembler = VectorAssembler(inputCols=['count_restaurant', 'count_crime', 'price', 'review_scores_rating'],outputCol="features")
#Transorming the dataframe to create a features column so that the Logistic Regression model can be trained with 
#features (vector column) and the output review_scores_rating
a=assembler.transform(a)
a.show(2,truncate=False)

+---+----------------+-----------+--------+----------------------+----------------------------+----------------+-----------------+-----+-----------------+--------------------+----------------------+
|_c0|count_restaurant|count_crime|id      |neighbourhood_cleansed|neighbourhood_group_cleansed|Latitude_listing|Longitude_listing|price|number_of_reviews|review_scores_rating|features              |
+---+----------------+-----------+--------+----------------------+----------------------------+----------------+-----------------+-----+-----------------+--------------------+----------------------+
|0  |0.13            |0.121      |10034090|Williamsburg          |Brooklyn                    |40.71049        |-73.94515        |0.04 |1                |20                  |[0.13,0.121,0.04,20.0]|
|1  |0.578           |0.33       |1022204 |Lower East Side       |Manhattan                   |40.72198        |-73.98932        |0.11 |49               |19                  |[0.578,0.33,0.11,19.0]|
+---+

In [85]:
#Importing Logistic regression from pyspark.ml


In [86]:
#Create instance of Logistic regression for our use.
#The value of regParam was adjusted to be 0.08 after testing a number of values
lr = LogisticRegression(maxIter=1000, regParam=0.08, elasticNetParam=0.6,featuresCol='features',labelCol='review_scores_rating')

# Fit the model
lrModel = lr.fit(a)

# Print the coefficients and intercept for multinomial logistic regression
print("Coefficients: \n" + str(lrModel.coefficientMatrix))
print("Intercept: " + str(lrModel.interceptVector))

Coefficients: 
21 X 4 CSCMatrix
(16,1) -0.0244
(19,2) 0.2896
(20,3) 0.8884
Intercept: [-587.5366817809942,-587.5366817809942,-587.5366817809942,-587.5366817809942,184.60269291806563,-587.5366817809942,181.95814420286464,179.3932111580719,184.39715070568317,181.79106361204282,183.7499080526778,181.5904386988544,185.60780183464468,184.26073870362552,185.47541521055112,185.5669916987404,187.7372042424789,187.4540505154091,188.44442210595494,183.53872685093125,172.11544839437377]


In [87]:
#add featurescolumn to the testing data too
assembler = VectorAssembler(inputCols=['count_restaurant', 'count_crime', 'price', 'review_scores_rating'],outputCol="features")
b=assembler.transform(b)
b.show(2)

+---+----------------+-----------+--------+----------------------+----------------------------+----------------+-----------------+-----+-----------------+--------------------+--------------------+
|_c0|count_restaurant|count_crime|      id|neighbourhood_cleansed|neighbourhood_group_cleansed|Latitude_listing|Longitude_listing|price|number_of_reviews|review_scores_rating|            features|
+---+----------------+-----------+--------+----------------------+----------------------------+----------------+-----------------+-----+-----------------+--------------------+--------------------+
| 12|           0.041|      0.128|11948586|              Flatbush|                    Brooklyn|        40.63836|        -73.95698|0.085|                6|                  14|[0.041,0.128,0.08...|
| 15|            0.09|      0.178|12643924|  Prospect-Lefferts...|                    Brooklyn|        40.65447|        -73.96122|0.041|               44|                  18|[0.09,0.178,0.041...|
+---+----------

In [88]:
#Showing predictions vs the actual review_rating
predictions_b = lrModel.transform(b)
predictions_b.show(2)

+---+----------------+-----------+--------+----------------------+----------------------------+----------------+-----------------+-----+-----------------+--------------------+--------------------+--------------------+--------------------+----------+
|_c0|count_restaurant|count_crime|      id|neighbourhood_cleansed|neighbourhood_group_cleansed|Latitude_listing|Longitude_listing|price|number_of_reviews|review_scores_rating|            features|       rawPrediction|         probability|prediction|
+---+----------------+-----------+--------+----------------------+----------------------------+----------------+-----------------+-----+-----------------+--------------------+--------------------+--------------------+--------------------+----------+
| 12|           0.041|      0.128|11948586|              Flatbush|                    Brooklyn|        40.63836|        -73.95698|0.085|                6|                  14|[0.041,0.128,0.08...|[-587.53668178099...|[0.0,0.0,0.0,0.0,...|      18.0|


In [91]:
#Evaluating the accuracy of the model

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="review_scores_rating", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %f %%" % (100*float(1.0 - accuracy

Test Error = 15.500000 %


In [90]:
#Run this cell to call the GUI else it can be executed separately too
#Call the GUI for the customer to search for stay recommendations
#%run ./Tkinter_Gui.ipynb