In [40]:
## Import packages
import findspark
findspark.init()

import pyspark # only run after findspark.init()
import pyspark.sql.functions as fn
from pyspark.sql import SparkSession


## Initialize SparkSession
spark = SparkSession.builder.getOrCreate()

## Load data
rawTrainDF = spark.read.format("csv").option("header",True).load("kaggletitanic/train.csv")

## Get average of Age for imputation
meanAge = rawTrainDF.agg(fn.avg(rawTrainDF["Age"])).collect()[0].__getitem__("avg(Age)")

## Impute null Age
rawTrainDF = rawTrainDF.na.fill({"Age": round(meanAge,1)})

rawTrainDF = rawTrainDF.select(fn.col("PassengerId"),fn.col("Survived"),fn.col("Sex"),fn.col("Embarked"),fn.col("Pclass").cast("float"),fn.col("Age").cast("float"),fn.col("SibSp").cast("float"),fn.col("Fare").cast("float"))

(trainDF, testDF) = rawTrainDF.randomSplit([0.7,0.3])


In [41]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer, OneHotEncoder, VectorAssembler, IndexToString
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

## List categorical features
categoricalFeatures = ["Sex", "Embarked"]

## Index categ features
indexer = [StringIndexer(inputCol=col, outputCol=col+"_indexed",handleInvalid="keep") for col in categoricalFeatures]

labelIndexer = [StringIndexer(inputCol="Survived", outputCol="Survived_indexed")]

## One Hot Encode indexed features
encoder = [OneHotEncoder(inputCol=col+"_indexed", outputCol=col+"_encoded") for col in categoricalFeatures]

## List required features to feed the model
requiredFeatures = ["Pclass","Sex_encoded","Age","SibSp","Fare","Embarked_encoded"]

## Create the vector strucutred data (label, features)
assembler = VectorAssembler(inputCols=requiredFeatures, outputCol="features")

## Train a RandomForest model
rf = RandomForestClassifier(labelCol="Survived_indexed", featuresCol="features")

## Create a pipeline to chain indexers and RF
pipeline = Pipeline(stages=labelIndexer + indexer + encoder + [assembler, rf])

## Train
model = pipeline.fit(trainDF)

## Predict
predictions = model.transform(testDF)


In [67]:
## Model evaluation

predictions = predictions.select(fn.col("Survived").cast("Float"),fn.col("prediction"))

evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % accuracy)
 
evaluatorf1 = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="f1")
f1 = evaluatorf1.evaluate(predictions)
print("f1 = %g" % f1)
 
evaluatorwp = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="weightedPrecision")
wp = evaluatorwp.evaluate(predictions)
print("weightedPrecision = %g" % wp)
 
evaluatorwr = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="weightedRecall")
wr = evaluatorwr.evaluate(predictions)
print("weightedRecall = %g" % wr)

Accuracy = 0.83274
f1 = 0.827945
weightedPrecision = 0.830243
weightedRecall = 0.83274


In [70]:
## Score the unlabeled dataset

rawTestDF = spark.read.format("csv").option("header",True).load("kaggletitanic/test.csv")

rawTestDF = rawTestDF.select(fn.col("PassengerId"),fn.col("Sex"),fn.col("Embarked"),fn.col("Pclass").cast("float"),fn.col("Age").cast("float"),fn.col("SibSp").cast("float"),fn.col("Fare").cast("float"))

## Get average of Age for imputation
meanAge = rawTestDF.agg(fn.avg(rawTestDF["Age"])).collect()[0].__getitem__("avg(Age)")

## Get average of Fare for imputation
meanFare = rawTestDF.agg(fn.avg(rawTestDF["Fare"])).collect()[0].__getitem__("avg(Fare)")

## Impute null Age
rawTestDF = rawTestDF.na.fill({"Age": round(meanAge,1)})

## Impute null Fare
rawTestDF = rawTestDF.na.fill({"Fare": round(meanFare,1)})

testPred = model.transform(rawTestDF)

## Export to CSV
testPred.select("PassengerId",fn.col("prediction").alias("Survived")).toPandas().to_csv(path_or_buf="gender_submission.csv",index=False)