In [12]:
import findspark as fds
fds.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DS_Final_Project").getOrCreate()

In [13]:
hotels = spark.read.csv('Hotels_data_Changed.csv',inferSchema=True,header=True)

hotels.show()

+---+-----------+--------------+--------------+----+--------------+--------------+-------------+---------------+--------------------+-----------+-------+---------+------------+------------------+
|_c0|Snapshot ID| Snapshot Date|  Checkin Date|Days|Original Price|Discount Price|Discount Code|Available Rooms|          Hotel Name|Hotel Stars|DayDiff|  WeekDay|DiscountDiff|      DiscountPerc|
+---+-----------+--------------+--------------+----+--------------+--------------+-------------+---------------+--------------------+-----------+-------+---------+------------+------------------+
|  0|          1|7/17/2015 0:00|8/12/2015 0:00|   5|          1178|          1040|            1|              6|Best Western Plus...|          3|     26|Wednesday|         138|11.714770797962649|
|  1|          1|7/17/2015 0:00|8/19/2015 0:00|   5|          1113|           982|            1|              8|Best Western Plus...|          3|     33|Wednesday|         131|11.769991015274034|
|  2|          1|7/1

In [14]:
import pyspark.sql.functions as f
from pyspark.sql import Window
w = Window.partitionBy('Snapshot Date', 'Checkin Date', 'DayDiff', 'Hotel Name', 'WeekDay')

# Take the max discount code from the grouped data
hotelsBeforeClassifcation = hotels.withColumn('maxDis', f.max('DiscountPerc').over(w))\
    .where(f.col('DiscountPerc') == f.col('maxDis'))\
    .drop('maxDis')\
    .select('Snapshot Date', 'Checkin Date', 'DayDiff', 'Hotel Name', 'WeekDay', 'Discount Code')
hotelsBeforeClassifcation.show()

+---------------+---------------+-------+--------------------+---------+-------------+
|  Snapshot Date|   Checkin Date|DayDiff|          Hotel Name|  WeekDay|Discount Code|
+---------------+---------------+-------+--------------------+---------+-------------+
|  1/1/2016 0:00| 1/13/2016 0:00|     12|Cassa Hotel 45th ...|Wednesday|            1|
|  1/1/2016 0:00| 1/13/2016 0:00|     12|The New York EDITION|Wednesday|            3|
|  1/1/2016 0:00| 1/29/2016 0:00|     28|Hampton Inn Manha...|   Friday|            4|
| 10/1/2015 0:00| 10/2/2015 0:00|      1|Hilton Garden Inn...|   Friday|            4|
|10/10/2015 0:00|11/11/2015 0:00|     32|Hyatt Union Squar...|Wednesday|            1|
|10/11/2015 0:00|10/18/2015 0:00|      7|DoubleTree by Hil...|   Sunday|            1|
|10/11/2015 0:00|10/27/2015 0:00|     16|The Carlyle A Ros...|  Tuesday|            1|
|10/11/2015 0:00|10/27/2015 0:00|     16|The Carlyle A Ros...|  Tuesday|            2|
|10/11/2015 0:00| 11/4/2015 0:00|     24|  

In [4]:
hotelsBeforeClassifcation.coalesce(1).write.csv('Spark_Classification_Data', header=True)# Loading the data from csv

In [35]:
hotelsForClassifcation = spark.read.csv('Spark_Classification_Data',inferSchema=True,header=True)

In [31]:
hotelsForClassifcation.show(5)

+---------------+---------------+-------+--------------------+---------+-------------+
|  Snapshot Date|   Checkin Date|DayDiff|          Hotel Name|  WeekDay|Discount Code|
+---------------+---------------+-------+--------------------+---------+-------------+
|  1/1/2016 0:00| 1/13/2016 0:00|     12|Cassa Hotel 45th ...|Wednesday|            1|
|  1/1/2016 0:00| 1/13/2016 0:00|     12|The New York EDITION|Wednesday|            3|
|  1/1/2016 0:00| 1/29/2016 0:00|     28|Hampton Inn Manha...|   Friday|            4|
| 10/1/2015 0:00| 10/2/2015 0:00|      1|Hilton Garden Inn...|   Friday|            4|
|10/10/2015 0:00|11/11/2015 0:00|     32|Hyatt Union Squar...|Wednesday|            1|
+---------------+---------------+-------+--------------------+---------+-------------+
only showing top 5 rows



In [36]:
from pyspark.sql.functions import udf,unix_timestamp,col
spHotelsClassification = hotelsForClassifcation.withColumn('Snapshot Date', unix_timestamp(col('Snapshot Date'), "dd/mm/yyyy hh:mm:ss a"))
spHotelsClassification.show(5)

+-------------+---------------+-------+--------------------+---------+-------------+
|Snapshot Date|   Checkin Date|DayDiff|          Hotel Name|  WeekDay|Discount Code|
+-------------+---------------+-------+--------------------+---------+-------------+
|         null| 1/13/2016 0:00|     12|Cassa Hotel 45th ...|Wednesday|            1|
|         null| 1/13/2016 0:00|     12|The New York EDITION|Wednesday|            3|
|         null| 1/29/2016 0:00|     28|Hampton Inn Manha...|   Friday|            4|
|         null| 10/2/2015 0:00|      1|Hilton Garden Inn...|   Friday|            4|
|         null|11/11/2015 0:00|     32|Hyatt Union Squar...|Wednesday|            1|
+-------------+---------------+-------+--------------------+---------+-------------+
only showing top 5 rows



In [12]:
hotelsForClassifcation.show(1)
from pyspark.sql.functions import udf,unix_timestamp,col
from pyspark.sql.types import IntegerType,StringType
#convert non numeric data to numeric data for algorithem

def weekToDay(x):
    try:
        return {
        'Sunday': 1,
        'Monday': 2,
        'Tuesday': 3,
        'Wednesday': 4,
        'Thursday': 5,
        'Friday': 6,
        'Saturday': 7
    }[x]
    except:
        return -1

weekToDay_udf = udf(weekToDay, IntegerType())
hotelsForClassifcation.show(1)
spHotelsClassification = hotelsForClassifcation.withColumn('Snapshot Date', unix_timestamp(col('Snapshot Date'), "mm/dd/yyyy")). \
        withColumn('Checkin Date', unix_timestamp(hotelsForClassifcation['Checkin Date'], "mm/dd/yyyy"))
spHotelsClassification.show(5)
newCol = weekToDay_udf(spHotelsClassification["WeekDay"])
spHotelsClassification = spHotelsClassification.withColumn("WeekDay", newCol)
spHotelsClassification.show(5)

from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="Hotel Name", outputCol="Hotel Name_id")
indexed = indexer.fit(spHotelsClassification).transform(spHotelsClassification)
final = indexed.drop("Hotel Name").withColumnRenamed("Hotel Name_id", "Hotel Name")
final.show()

+-------------+--------------+-------+--------------------+---------+-------------+
|Snapshot Date|  Checkin Date|DayDiff|          Hotel Name|  WeekDay|Discount Code|
+-------------+--------------+-------+--------------------+---------+-------------+
|1/1/2016 0:00|1/13/2016 0:00|     12|Cassa Hotel 45th ...|Wednesday|            1|
+-------------+--------------+-------+--------------------+---------+-------------+
only showing top 1 row

+-------------+--------------+-------+--------------------+---------+-------------+
|Snapshot Date|  Checkin Date|DayDiff|          Hotel Name|  WeekDay|Discount Code|
+-------------+--------------+-------+--------------------+---------+-------------+
|1/1/2016 0:00|1/13/2016 0:00|     12|Cassa Hotel 45th ...|Wednesday|            1|
+-------------+--------------+-------+--------------------+---------+-------------+
only showing top 1 row

+-------------+------------+-------+--------------------+---------+-------------+
|Snapshot Date|Checkin Date|Da

In [14]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

vAssembler = VectorAssembler(inputCols= ['Snapshot Date','Checkin Date','Hotel Name','DayDiff','WeekDay'],outputCol="features")
vector = vAssembler.transform(final)
dataAndTarget = vector.select("features", "Discount Code")

train_data,test_data = dataAndTarget.randomSplit([0.7,0.3])
train_data.head()

Row(features=DenseVector([1420063740.0, 1420063800.0, 3.0, 30.0, 5.0]), Discount Code=4)

In [15]:
#DecisionTree

from pyspark.ml.classification import DecisionTreeClassifier

decisionTree = DecisionTreeClassifier(labelCol="Discount Code", featuresCol="features", maxBins=554, maxDepth=30)

dtmodel = decisionTree.fit(train_data)

dtpredictions = dtmodel.transform(test_data)
dtevaluator = MulticlassClassificationEvaluator(labelCol="Discount Code", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = dtevaluator.evaluate(dtpredictions)
print("Decision Tree test set accuracy = " + str(accuracy))

Decision Tree test set accuracy = 0.6806817198719612


In [25]:
#NaiveBayes

from pyspark.ml.classification import NaiveBayes

navBay = NaiveBayes(modelType="multinomial", labelCol="Discount Code")

nbmodel = navBay.fit(train_data)

nbpredictions = nbmodel.transform(test_data)
nbevaluator = MulticlassClassificationEvaluator(labelCol="Discount Code", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = nbevaluator.evaluate(nbpredictions)
print("Naive bayes test set accuracy = " + str(accuracy))

Naive bayes test set accuracy = 0.22582691697667043


In [27]:
## Results
# The results we got were no different than the results we got in excercise 2.

In [17]:
dtmodel.featureImportances

SparseVector(5, {0: 0.2329, 1: 0.2456, 2: 0.3592, 3: 0.1107, 4: 0.0516})

In [21]:
# Let's Calculate TP and FP
from pyspark.mllib.evaluation import MulticlassMetrics

decTree = dtpredictions.rdd.map(lambda x: (x["prediction"], float(x["Discount Code"])))
navBay = nbpredictions.rdd.map(lambda x: (x["prediction"], float(x["Discount Code"])))

metricsDT = MulticlassMetrics(decTree)
metricsNB = MulticlassMetrics(navBay)

print('True Positive - Decision Tree')
print(metricsDT.truePositiveRate(1.0),metricsDT.truePositiveRate(2.0),metricsDT.truePositiveRate(3.0),metricsDT.truePositiveRate(4.0))
print('True Positive - Naive Bayes')
print(metricsNB.truePositiveRate(1.0),metricsNB.truePositiveRate(2.0),metricsNB.truePositiveRate(3.0),metricsNB.truePositiveRate(4.0))
print('False Poisitive - Decision Tree')
print(metricsDT.falsePositiveRate(1.0),metricsDT.falsePositiveRate(2.0),metricsDT.falsePositiveRate(3.0),metricsDT.falsePositiveRate(4.0))
print('False Poisitive - Naive Bayes')
print(metricsNB.falsePositiveRate(1.0),metricsNB.falsePositiveRate(2.0),metricsNB.falsePositiveRate(3.0),metricsNB.falsePositiveRate(4.0))

True Positive - Decision Tree
0.6888548539114043 0.6972854400877434 0.6790084719171635 0.6393529101459469
True Positive - Naive Bayes
0.9102262016965127 0.007951741157115437 0.0018826482585503608 0.0
False Poisitive - Decision Tree
0.11611745389285578 0.1453067071115605 0.11466794075489728 0.05874439461883408
False Poisitive - Naive Bayes
0.9137805949062584 0.005645433097404786 0.0023889154323936935 0.0


In [24]:
# Let's Calculate ROC Curve

from pyspark.mllib.evaluation import BinaryClassificationMetrics

def makeBinary(predictionAndTarget, index):
    
    first = 0.0
    second = 0.0
    if predictionAndTarget[0] == index:
        first = 1.0
    if predictionAndTarget[1] == index:
        second = 1.0
    return (first, second)

#run to all the dicount code
for index in range(1,5):
    
    currPredAndLabDT = decTree.map(lambda x: makeBinary(x, index))
    currPredAndLabNB = navBay.map(lambda x: makeBinary(x, index))

    # Instantiate metrics object
    metricsDT = BinaryClassificationMetrics(currPredAndLabDT)
    metricsNB = BinaryClassificationMetrics(currPredAndLabNB)
    
    # ROC
    print("ROC Curve - Decision Tree - Discount code " + str(index) + " is " + str(metricsDT.areaUnderROC))
    print("ROC Curve - Naive Bayes - Discount code " + str(index) + " is " + str(metricsNB.areaUnderROC))

ROC Curve - Decision Tree - Discount code 1 is 0.7863687000092743
ROC Curve - Naive Bayes - Discount code 1 is 0.49822280339512715
ROC Curve - Decision Tree - Discount code 2 is 0.7759893664880915
ROC Curve - Naive Bayes - Discount code 2 is 0.5011531540298554
ROC Curve - Decision Tree - Discount code 3 is 0.7821702655811332
ROC Curve - Naive Bayes - Discount code 3 is 0.4997468664130783
ROC Curve - Decision Tree - Discount code 4 is 0.7903042577635563
ROC Curve - Naive Bayes - Discount code 4 is 0.5
