In [51]:
# import modules
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession

# build spark session and spark context
spark = SparkSession.builder \
        .master("local[4]") \
        .appName("hotel") \
        .getOrCreate()
sc = spark.sparkContext

df = spark.read.csv('hotel_bookings.csv',  inferSchema=True, header = True)

In [2]:
#Parameters for defining code below
target = 'is_canceled'
cancel_label = 1
noncancel_label = 0

In [3]:
#Balancing a DataFrame with Downsampling

def downsample(df, target, cancel_label, noncancel_label):
    """
    df               spark dataframe
    target           str, target variable
    cancel_label     int, value of canceled booking
    noncancel_label  int, value of non-canceled booking
    
    """

    ### ENTER CODE HERE
    
    from pyspark.sql.functions import col
    
    #count of canceled and non-canceled labels
    cancel_n = df.filter(col(target) == cancel_label).count()
    noncancel_n = df.filter(col(target) == noncancel_label).count()
    
    #df split by having either the poitive or negative labels
    df_cancel = df.filter(col(target) == cancel_label)
    df_noncancel = df.filter(col(target) == noncancel_label)
    
    
    if cancel_n > noncancel_n:
        #amount to sample from is fraction of low noncancel/full cancel
        df_a = df_cancel.sample(fraction = (noncancel_n/cancel_n))
        #combine df_cancel sample with full df_noncancel
        df_b = df_noncancel.union(df_a)
    elif noncancel_n > cancel_n:
        #amount to sample from is fraction of low cancel/full non-cancel
        df_a = df_noncancel.sample(fraction = (cancel_n/noncancel_n))
        #combine df_noncancel sample with full df_cancel
        df_b = df_cancel.union(df_a)
    else:
        #if count of df_cancel = df_noncancel, then just use original df
        df_b = df

    return df_b

In [4]:
# Call your downsample function here, and show the count by label
df_downsample = downsample(df, target, cancel_label, noncancel_label)
df_downsample.groupBy(target).count().show()

+-----------+-----+
|is_canceled|count|
+-----------+-----+
|          1|44224|
|          0|44185|
+-----------+-----+



In [5]:
from pyspark.sql.functions import col

# replace the strings "NULL" and "NA" with null value
df_withNull = df_downsample.replace('NULL', None).replace('NA', None)

# replace null values in 'children' to 0 since there are only 4
df2 = df_withNull.fillna({'children':0})

# replace 'children' datatype to int
df2 = df2.withColumn('children', col('children').cast("Int"))

# drop 'company' and 'agent' due to high null count
df2 = df2.drop('agent', 'company','country', 'arrival_date_week_number', 'reservation_status')

In [6]:
# addressing reservation_status_date, which gives the date at which the last 
# reservation status was set. I'm transforming it into number of days since reservation
# status was set, so day of arrival - reservation_status_date
# from pyspark.sql.types import DateType

# convert reservation_status_date into datetype dtype
# temp = df2.withColumn("reservation_status_date", df2["reservation_status_date"].cast(DateType()))

# need to combine arrival_date_year, arrival_date_month, and arrival_date_day_of_month
# into one column and cast it to DateType, then replacing reservation_status_date column
# with number of days since reservation.
# until then, dropping reservation_status_date
df2 = df2 .drop('reservation_status_date')

In [7]:
#numerically encode all columns of type string
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, OneHotEncoderModel, StringIndexer

# list of columns to numerically encode
col_string=['hotel', 'meal','market_segment',
            'distribution_channel','reserved_room_type','assigned_room_type',
            'deposit_type','customer_type']
col_stringwmonth = col_string+['arrival_date_month']


# col_num = list of new column names being changed into numeric
col_num=[x+"_NUMERIC" for x in col_string]
# don't need to change dates into numeric col_num=col_num+['arrival_date_year','arrival_date_day_of_month']

# col_oh = list of columns being one-hot encoded
col_oh=[x+"_oh" for x in col_string]
# dont need to ohencode dates col_oh=col_oh+['arrival_date_year_oh','arrival_date_day_of_month_oh']

In [8]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_NUMERIC").fit(df2) for column in col_stringwmonth]
pipeline = Pipeline(stages=indexers)
df_indexed = pipeline.fit(df2).transform(df2)

In [9]:
#one-hot encode all columns in col_num
ohe = OneHotEncoder(dropLast=False)
ohe.setInputCols(col_num)
ohe.setOutputCols(col_oh)
model = ohe.fit(df_indexed)

df_casted=model.transform(df_indexed)

In [10]:
# drop the original, non-ohencoded variables
df_encoded=df_casted.drop(*col_stringwmonth)
df_encoded=df_encoded.drop(*col_num)

In [11]:
#removed normalization step for now
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import Normalizer

norm_to_columns = ['lead_time', 'stays_in_weekend_nights', 'stays_in_week_nights', 'adults', 'children', 'babies', 
                'previous_cancellations', 'previous_bookings_not_canceled', 'booking_changes', 'adr', 
                'required_car_parking_spaces', 'total_of_special_requests']

# combine all to-norm columns into one vector named "norm_features"
assembler = VectorAssembler(inputCols=norm_to_columns, outputCol="norm_features")
transformed = assembler.transform(df_encoded) 
transformed = transformed.drop(*norm_to_columns)

# in the end, new column named normFeatures combined all features that are normalized
normalizer = Normalizer(inputCol="norm_features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(transformed)
l1NormData = l1NormData.drop(*norm_to_columns + ['norm_features'])
print("Normalized using L^1 norm")

Normalized using L^1 norm


In [12]:
# combine all feature columns (non-label columns) into one vector named "feature"
assembler = VectorAssembler(inputCols=df_encoded.columns[1:], outputCol="features")
all_transformed = assembler.transform(df_encoded)

# convert to rdd with 2 columns, label and features, where features is a DenseVector combination of all other features
dataRdd = all_transformed.select(col("is_canceled").alias("label"), col("features")).rdd.map(tuple)

In [13]:
from pyspark.mllib.regression import LabeledPoint

# map features as floats, then input all into a DenseVector, as well as mapping labels as floats
# then map into LabeledPoint object
lp = dataRdd.map(lambda row: (float(row[0]), Vectors.dense([float(c) for c in row[1]])))\
            .map(lambda row: LabeledPoint(row[0], row[1]))
lp.take(1)

[LabeledPoint(1.0, [85.0,2015.0,1.0,0.0,3.0,2.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,82.0,0.0,1.0,1.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0])]

In [14]:
#type(lp.take(1))

In [15]:
# 70/30 test train split
seed = 314
train, test = lp.randomSplit([0.7, 0.3], seed=seed)

In [16]:
# use these variables in place of test/train.count() bc those methods take a long time, instead just call once
test_count = test.count()
train_count = train.count()

In [17]:
#from pyspark.mllib.tree import RandomForest
#from pyspark.mllib.util import MLUtils

#data = MLUtils.loadLibSVMFile(sc, 'hotel_bookings.csv')
#data.take(2)

In [42]:
from pyspark.mllib.tree import RandomForest
from pyspark.mllib.util import MLUtils

# Train a RandomForest model.
#  Empty categoricalFeaturesInfo indicates all features are continuous.
#  Setting featureSubsetStrategy="auto" lets the algorithm choose.
model = RandomForest.trainClassifier(train, numClasses=2, categoricalFeaturesInfo={},
                                     numTrees=100, featureSubsetStrategy="auto",
                                     impurity='gini', maxDepth=5, maxBins=32)

In [52]:
# Evaluate model on test instances and compute test error
predictions = model.predict(test.map(lambda x: x.features))
labelsAndPredictions = test.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(lambda p: p[0] != p[1]).count() / float(test_count)
print('Test Error = ' + str(testErr))

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 82.0 failed 1 times, most recent failure: Lost task 2.0 in stage 82.0 (TID 588, jupyter-kliu01, executor driver): org.apache.spark.SparkException: Failed to execute user defined function(StringIndexerModel$$Lambda$3244/1026762699: (string) => double)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:156)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:148)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:295)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:607)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)
Caused by: org.apache.spark.SparkException: Unseen label: Gr. To handle unseen labels, set Param handleInvalid to keep.
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1(StringIndexer.scala:405)
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1$adapted(StringIndexer.scala:390)
	... 14 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2133)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:168)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function(StringIndexerModel$$Lambda$3244/1026762699: (string) => double)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:156)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:148)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:295)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:607)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)
Caused by: org.apache.spark.SparkException: Unseen label: Gr. To handle unseen labels, set Param handleInvalid to keep.
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1(StringIndexer.scala:405)
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1$adapted(StringIndexer.scala:390)
	... 14 more


In [38]:
type(labelsAndPredictions)

pyspark.rdd.RDD

In [48]:
predictions.take(1)

[0.0]

In [44]:
# Evaluating the model on test data
labelsAndPreds_te = test.map(lambda p: (p.label, float(model.predict(p.features))))
accuracy_te = 1.0 * labelsAndPreds_te.filter(lambda p: p[0] == p[1]).count() / test_count
#print('model accuracy (test): {}'.format(accuracy_te))

Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/pyspark/serializers.py", line 468, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
  File "/opt/conda/lib/python3.7/site-packages/pyspark/cloudpickle.py", line 1097, in dumps
    cp.dump(obj)
  File "/opt/conda/lib/python3.7/site-packages/pyspark/cloudpickle.py", line 357, in dump
    return Pickler.dump(self, obj)
  File "/opt/conda/lib/python3.7/pickle.py", line 437, in dump
    self.save(obj)
  File "/opt/conda/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/conda/lib/python3.7/pickle.py", line 786, in save_tuple
    save(element)
  File "/opt/conda/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/conda/lib/python3.7/site-packages/pyspark/cloudpickle.py", line 501, in save_function
    self.save_function_tuple(obj)
  File "/opt/conda/lib/python3.7/site-package

PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

In [41]:
from pyspark.mllib.evaluation import MulticlassMetrics

metrics = MulticlassMetrics(labelsAndPreds_te)
labelsAndPreds_te.take(3)
print("Confusion Matrix:\n{}".format(metrics.confusionMatrix().toArray()))

Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/pyspark/serializers.py", line 468, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
  File "/opt/conda/lib/python3.7/site-packages/pyspark/cloudpickle.py", line 1097, in dumps
    cp.dump(obj)
  File "/opt/conda/lib/python3.7/site-packages/pyspark/cloudpickle.py", line 357, in dump
    return Pickler.dump(self, obj)
  File "/opt/conda/lib/python3.7/pickle.py", line 437, in dump
    self.save(obj)
  File "/opt/conda/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/conda/lib/python3.7/pickle.py", line 786, in save_tuple
    save(element)
  File "/opt/conda/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/conda/lib/python3.7/site-packages/pyspark/cloudpickle.py", line 501, in save_function
    self.save_function_tuple(obj)
  File "/opt/conda/lib/python3.7/site-package

PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

### Logistic Regression

In [16]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS

# train logistic regression model with training data
model = LogisticRegressionWithLBFGS.train(train)

In [17]:
# Evaluating the model on training data
labelsAndPreds_tr = train.map(lambda p: (p.label, model.predict(p.features)))
accuracy_tr = 1.0 * labelsAndPreds_tr.filter(lambda pl: pl[0] == pl[1]).count() / train_count
print('model accuracy (train): {}'.format(accuracy_tr))

model accuracy (train): 0.7654372955995208


In [18]:
# Evaluating the model on test data
labelsAndPreds_te = test.map(lambda p: (p.label, model.predict(p.features)))
accuracy_te = 1.0 * labelsAndPreds_te.filter(lambda pl: pl[0] == pl[1]).count() / test_count
print('model accuracy (test): {}'.format(accuracy_te))

model accuracy (test): 0.7631966192506509
