In [1]:
import pandas as pd
from pyspark.sql.functions import lit
from pyspark.sql.functions import *
from pyspark.sql.functions import Column
from pyspark.sql.types import *
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.linalg import Vectors
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

train_data = sqlContext.read.load('/FileStore/tables/train.csv', 
                          format='com.databricks.spark.csv', 
                          header='true',  
                          inferSchema='true')

original_test_data = sqlContext.read.load('/FileStore/tables/test.csv', 
                          format='com.databricks.spark.csv', 
                          header='true', 
                          inferSchema='true')

test_data_survived = original_test_data.withColumn("Survived", lit('none'))

#test_data_survived.show(419)
#train_data.show(891)

#train_data.select('Name').distinct().count()
#test_data_survived.select('Name').distinct().count()

'''preprocessing'''
'''removing title from names and creating title column'''
name_with_title = train_data.select(split('Name', ',')[1].alias('Name'))
only_title = name_with_title.select(split('Name', " ")[1].alias('title'))
 
only_title = only_title.withColumn("row_id", monotonically_increasing_id())
train_data = train_data.withColumn("row_id", monotonically_increasing_id())
train_data = train_data.join(only_title, train_data.row_id == only_title.row_id).drop("row_id")
#train_data.select('title').distinct().show()

'''grouping title into 4 categories as Sir, Lady, Boy & girl'''
updated_train_data = train_data.withColumn("title", regexp_replace("title", "Miss.", "girl"))
updated_train_data1 = updated_train_data.withColumn("title", regexp_replace("title", "Ms.", "girl"))
updated_train_data2 = updated_train_data1.withColumn("title", regexp_replace("title", "Mme.", "Lady"))
updated_train_data3 = updated_train_data2.withColumn("title", regexp_replace("title", "Mlle.", "Lady"))
updated_train_data4 = updated_train_data3.withColumn("title", regexp_replace("title", "Dona.", "Lady"))
updated_train_data5 = updated_train_data4.withColumn("title", regexp_replace("title", "Lady.", "Lady"))
updated_train_data6 = updated_train_data5.withColumn("title", regexp_replace("title", "the", "Lady"))
updated_train_data7 = updated_train_data6.withColumn("title", regexp_replace("title", "Capt.", "Sir"))
updated_train_data8 = updated_train_data7.withColumn("title", regexp_replace("title", "Don.", "Sir"))
updated_train_data9 = updated_train_data8.withColumn("title", regexp_replace("title", "Major.", "Sir"))
updated_train_data10 = updated_train_data9.withColumn("title", regexp_replace("title", "Sir.", "Sir"))
updated_train_data11 = updated_train_data10.withColumn("title", regexp_replace("title", "Jonkheer.", "Sir"))
updated_train_data12 = updated_train_data11.withColumn("title", regexp_replace("title", "Col.", "Sir"))
updated_train_data13 = updated_train_data12.withColumn("title", regexp_replace("title", "Rev.", "Sir"))
updated_train_data14 = updated_train_data13.withColumn("title", regexp_replace("title", "Mrs.", "Lady"))
updated_train_data15 = updated_train_data14.withColumn("title", regexp_replace("title", "Mr.", "Sir"))
updated_train_data16 = updated_train_data15.withColumn("title", regexp_replace("title", "Master.", "Boy"))
updated_train_data17 = updated_train_data16.withColumn("title",when((updated_train_data16.Sex == 'male') & \
                                                                 (updated_train_data16.title == 'Dr.'), 'Sir').otherwise(updated_train_data16.title))
train_data_withtitle = updated_train_data17.withColumn("title",when((updated_train_data17.Sex == 'female') & \
                                                                 (updated_train_data17.title == 'Dr.'), 'Lady').otherwise(updated_train_data17.title))

'''finding family size for each name'''
def add(x,y):
    return x+y+1
  
column_add = udf(add, IntegerType())
train_data_withfamily = train_data_withtitle.withColumn("familysize", column_add('SibSP', 'Parch')) #use SibSp values & Parch + 1 to find family size
#mean_fare = train_data_withfamily.select(avg("Fare")).show()

'''handling na's'''
train_data_fare = train_data_withfamily.withColumn("Fare", when((train_data_withfamily.Fare == 0.0), 32.2042079685746).otherwise\
                                                   (train_data_withfamily.Fare))
train_data_fare = train_data_fare.withColumn("Age", when((train_data_fare.Age.isNull()),29.69911764705882).otherwise(train_data_fare.Age))
#train_data_fare.select("Fare").distinct().show()
#train_data_fare.show()
#train_data.select(avg("Age")).show()
#train_data_fare.select("Age").fillna(29.69911764705882)
#train_data_fare.fillna({'Age':29.69911764705882})
#train_data_fare.select("Age").show()

'''
train_data_fare.select('Pclass').printSchema()
train_data_fare.select('title').printSchema()
train_data_fare.select('Fare').printSchema()
train_data_fare.select('Age').printSchema()
train_data_fare.select('Sex').printSchema()
train_data_fare.select('familysize').printSchema()
train_data_fare.select('familysize').distinct().show()
'''

'''String Indexer, One Hot Encoder & Vector Assembler'''
stringIndexer = StringIndexer(inputCol="title", outputCol="titleIndex")
model = stringIndexer.fit(train_data_fare) 
indexed = model.transform(train_data_fare)

stringIndexer1 = StringIndexer(inputCol="Sex", outputCol="Sexindex")
model1 = stringIndexer1.fit(indexed)
indexed1 = model1.transform(indexed)

encoder = OneHotEncoder(inputCol="titleIndex", outputCol="titleVec")
encoded = encoder.transform(indexed1)
encoder1 = OneHotEncoder(inputCol="Sexindex", outputCol="SexVec")
encoded1 = encoder1.transform(encoded)

#encoded1.select("titleIndex").show()
#encoded1.select("Sexindex").show()
#encoded1.select("titleVec").show()
#encoded1.select("SexVec").show()

#encoded1.show()
assembler = VectorAssembler(inputCols=["titleVec","SexVec","Pclass","Fare","familysize","Age"], outputCol = "features")
train_features = assembler.transform(encoded1)
train_features.select("features").show(truncate = False)

#encoded1.where(encoded1.titleVec.isNull()).count()
#encoded1.where(encoded1.SexVec.isNull()).count()
#encoded1.where(encoded1.Pclass.isNull()).count()
#encoded1.where(encoded1.Fare.isNull()).count()
#encoded1.where(encoded1.familysize.isNull()).count()
#encoded1.where(encoded1.Age.isNull()).count()


#SibSp_rdd = train_data_withtitle.select("SibSp").rdd
#Parch_rdd = train_data_withtitle.select("Parch").rdd
#family_size_rdd = SibSp_rdd.zip(Parch_rdd).map( lambda x : int(x))
#family_size_rdd.take(3)


In [2]:
'''model building, random forest with tuning'''

'''splitting training data into train & test'''
(training_data, test_data) = train_features.randomSplit([0.7, 0.3], seed = 100)
training_data.count()
test_data.count()

rf = RandomForestClassifier(labelCol="Survived", featuresCol="features")
rfModel = rf.fit(training_data)

rf_predictions = rfModel.transform(test_data)
#rf_predictions.select("Survived", "prediction", "probability", "title","Sex","Pclass","Fare","familysize","Age", "features").show(truncate=False)
#x = rf_predictions.toPandas()
#print(x)
evaluator = MulticlassClassificationEvaluator().setLabelCol("Survived").setPredictionCol("prediction").setMetricName("accuracy")
#evaluator1 = BinaryClassificationEvaluator()
evaluator.evaluate(rf_predictions) #0.7870036101083032
#evaluator1.evaluate(rf_predictions) 

In [3]:
'''rf tuning'''
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder().addGrid(rf.maxDepth, [2, 4, 6])\
            .addGrid(rf.maxBins, [20, 60])\
            .addGrid(rf.numTrees, [5, 20])\
            .build())

cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

rfcv_model = cv.fit(training_data)
rf_cvPredictions = rfcv_model.transform(test_data)
evaluator.evaluate(rf_cvPredictions) #0.8194945848375451


In [4]:
'''Decision Tree'''
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(labelCol="Survived", featuresCol="features", maxDepth=3)
dt_model = dt.fit(training_data)
dt_predictions = dt_model.transform(test_data)
evaluator.evaluate(dt_predictions) #0.8194945848375451


In [5]:
'''dt tuning'''
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()\
             .addGrid(dt.maxDepth, [1,2,6,10])\
             .addGrid(dt.maxBins, [20,40,80])\
            .build())

cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
dt_cvmodel = cv.fit(training_data)
dt_cvpredictions = dt_cvmodel.transform(test_data)
evaluator.evaluate(dt_cvpredictions) #0.8267148014440433

In [6]:
cv_model.bestModel.numNodes
cv_model.bestModel.depth

In [7]:
'''Logistic Regression'''
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol="Survived", featuresCol="features", maxIter=10)
lr_model = lr.fit(training_data)
lr_predictions = lr_model.transform(test_data)
evaluator.evaluate(lr_predictions) #0.7870036101083032


In [8]:
'''lr tuning'''
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()\
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])\
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
             .addGrid(lr.maxIter, [1, 5, 10])\
             .build())

lr_cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)

lr_cvModel = cv.fit(training_data)

lr_cvpredictions = lr_cvModel.transform(test_data)
evaluator.evaluate(lr_cvpredictions) #0.8267148014440433

In [9]:
'''Multilayer perceptron'''
from pyspark.ml.classification import MultilayerPerceptronClassifier

(train, test) = train_features.randomSplit([0.6, 0.4], 1234)

layers = [len("features"), 20, 10, 2]
train_mt = MultilayerPerceptronClassifier(labelCol="Survived",featuresCol="features",maxIter=100, layers=layers, blockSize=128, seed=1234)\

mt_model = train_mt.fit(train)
mt_predictions = mt_model.transform(test)
predictions_and_labels = mt_predictions.select("prediction", "Survived")
evaluator.evaluate(predictions_and_labels) #~ 0.75 - 0.78