<br><br><br><br><br><h1 style="font-size:2em;color:#2467C0">Predict survival on the Titanic and get familiar with ML basics</h1><br><br><br>

In [55]:
#from pyspark.sql import SQLContext
from pyspark.sql import DataFrameNaFunctions
from pyspark.sql.types import FloatType
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer
from pyspark.sql.functions import when, udf, col, collect_list
import numpy as np

In [56]:
# Load the data stored in CSV format as DataFrame.
trainData = spark.read.format("org.apache.spark.csv") \
            .option("header","true").option("inferSchema", "true") \
            .csv("data/train.csv")
testData = spark.read.format("org.apache.spark.csv") \
            .option("header","true").option("inferSchema", "true") \
            .csv("data/test.csv")
submissionData = spark.read.format("org.apache.spark.csv") \
                .option("header","true").option("inferSchema", "true") \
                .csv("data/gender_submission.csv")

In [57]:
trainData.count()
testData.count()
trainData.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [58]:
trainData.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
PassengerId,891,446.0,257.3538420152301,1,891
Survived,891,0.3838383838383838,0.48659245426485753,0,1
Pclass,891,2.308641975308642,0.8360712409770491,1,3
Name,891,,,"""Andersson, Mr. August Edvard (""""Wennerstrom"""")""","van Melkebeke, Mr. Philemon"
Sex,891,,,female,male
Age,714,29.69911764705882,14.526497332334035,0.42,80.0
SibSp,891,0.5230078563411896,1.1027434322934315,0,8
Parch,891,0.38159371492704824,0.8060572211299488,0,6
Ticket,891,260318.54916792738,471609.26868834975,110152,WE/P 5735


In [59]:
testData.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
PassengerId,418,1100.5,120.81045760473994,892,1309
Pclass,418,2.2655502392344498,0.8418375519640503,1,3
Name,418,,,"""Assaf Khalil, Mrs. Mariana (Miriam"""")""""""","van Billiard, Master. Walter John"
Sex,418,,,female,male
Age,332,30.272590361445783,14.181209235624424,0.17,76.0
SibSp,418,0.4473684210526316,0.8967595611217135,0,8
Parch,418,0.3923444976076555,0.9814288785371694,0,9
Ticket,418,223850.98986486485,369523.7764694362,110469,W.E.P. 5734
Fare,417,35.6271884892086,55.907576179973844,0.0,512.3292


In [60]:
# Create new Int field "SexInt" based on the existing String field "Sex"
# Sex "Male" = 1
# Sex "Female" = 0
newTrainData = trainData.withColumn('SexInt', 
                                    when(trainData.Sex == 'female', 0)
                                    .otherwise(1)
                                    )
newTrainData = newTrainData.withColumn('embarkedInt', 
                                       when(newTrainData.Embarked == 'C', 0)
                                       .when(newTrainData.Embarked == 'Q', 1)
                                       .otherwise(2)
                                      )
                                      

newTestData = testData.withColumn('SexInt', 
                                    when(testData.Sex == 'female', 0)
                                    .otherwise(1)
                                    )
newTestData = newTestData.withColumn('embarkedInt', 
                                       when(newTestData.Embarked == 'C', 0)
                                       .when(newTestData.Embarked == 'Q', 1)
                                       .otherwise(2)
                                      )
newTrainData.printSchema()
newTrainData.show(50)
#newTestData.count()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- SexInt: integer (nullable = false)
 |-- embarkedInt: integer (nullable = false)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+--------+-----------+--------+------+-----------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|    Fare|      Cabin|Embarked|SexInt|embarkedInt|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+--------+-----------+--------+------+-----------+
|          1|       0|     3|

In [61]:
# process test data
# extract Mr, Mrs, Miss, Master
newTrainData = newTrainData.withColumn('title', 
                                     when(col('Name').like('%Master.%'), 'Master.')
                                     .when(col('Name').like('%Miss.%')
                                           | col('Name').like('%Mlle.%')
                                           | col('Name').like('%Ms.%'), 'Miss.')
                                     .when(col('Name').like('%Mrs.%') 
                                           | col('Name').like('%Mme.%')
                                           | col('Name').like('%Lady.%'), 'Mrs.')
                                     .when(col('Name').like('%Mr.%') 
                                           | col('Name').like('%Sir.%')
                                           | col('Name').like('%Major.%'), 'Mr.')
                                     .when(col('Name').like('%Rev.%'), 'Rev.')    
                                     .when(col('Name').like('%Dr.%'), 'Dr.')                                     
                                     .otherwise('Unknown.')
                                    )
newTestData = newTestData.withColumn('title', 
                                     when(col('Name').like('%Master.%'), 'Master.')
                                     .when(col('Name').like('%Miss.%')
                                           | col('Name').like('%Mlle.%')
                                           | col('Name').like('%Ms.%'), 'Miss.')
                                     .when(col('Name').like('%Mrs.%') 
                                           | col('Name').like('%Mme.%')
                                           | col('Name').like('%Lady.%'), 'Mrs.')
                                     .when(col('Name').like('%Mr.%') 
                                           | col('Name').like('%Sir.%')
                                           | col('Name').like('%Major.%'), 'Mr.')
                                     .when(col('Name').like('%Rev.%'), 'Rev.')    
                                     .when(col('Name').like('%Dr.%'), 'Dr.')                                     
                                     .otherwise('Unknown.')
                                    )

In [62]:
# Median age
newTrainData.approxQuantile('Age', [0.5], 0)
newTrainData.registerTempTable("df")
df2 = sqlContext.sql("select title, percentile(Age,0.5) as median_age \
                      from df group by title")
df2.show()
#newTrainData = newTrainData.withColumn('Age', 
#                        when(col('Age').isNull(), df2.select('approxQuantile').where('title' == col('title')))
#                        .otherwise(col('Age')))
#newTrainData.show(50)


+--------+----------+
|   title|median_age|
+--------+----------+
|   Miss.|      21.0|
|    Rev.|      46.5|
|Unknown.|      48.0|
|     Mr.|      30.0|
|    Mrs.|      35.0|
|     Dr.|      46.5|
| Master.|       3.5|
+--------+----------+



In [63]:
df2.where(df2.title == 'Mr.').select('median_age').collect()[0][0]

30.0

In [64]:
#newTrainData = newTrainData.withColumn('Age2', 
#                        when(col('Age').isNull(), 
#                             df2.where(df2.title1 == newTrainData.title).select('median_age').collect()[0][0])
#                        .otherwise(col('Age')))
newTrainData = newTrainData.join(df2,['title'],"inner")
newTrainData = newTrainData.withColumn('Age',
                                      when(col('Age').isNull(),
                                          col('median_age'))
                                      .otherwise(col('Age')))

newTestData = newTestData.join(df2,['title'],"inner")
newTestData = newTestData.withColumn('Age',
                                      when(col('Age').isNull(),
                                          col('median_age'))
                                      .otherwise(col('Age')))
#newTrainData.show(50)

In [65]:
newTrainData.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
title,891,,,Dr.,Unknown.
PassengerId,891,446.0,257.3538420152301,1,891
Survived,891,0.3838383838383838,0.48659245426485753,0,1
Pclass,891,2.308641975308642,0.8360712409770491,1,3
Name,891,,,"""Andersson, Mr. August Edvard (""""Wennerstrom"""")""","van Melkebeke, Mr. Philemon"
Sex,891,,,female,male
Age,891,29.390202020202018,13.265321985344801,0.42,80.0
SibSp,891,0.5230078563411896,1.1027434322934315,0,8
Parch,891,0.38159371492704824,0.8060572211299488,0,6


In [66]:
newTestData.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
title,418,,,Dr.,Unknown.
PassengerId,418,1100.5,120.81045760473994,892,1309
Pclass,418,2.2655502392344498,0.8418375519640503,1,3
Name,418,,,"""Assaf Khalil, Mrs. Mariana (Miriam"""")""""""","van Billiard, Master. Walter John"
Sex,418,,,female,male
Age,418,29.75956937799043,13.033881536964937,0.17,76.0
SibSp,418,0.4473684210526316,0.8967595611217135,0,8
Parch,418,0.3923444976076555,0.9814288785371694,0,9
Ticket,418,223850.98986486485,369523.7764694362,110469,W.E.P. 5734


In [67]:
featureColumns = ['Pclass', 'SexInt', 'SibSp', 'Parch', 'embarkedInt']

In [68]:
# Delete non useful fields
newTrainData = newTrainData.drop('Cabin', 'Ticket', 'PassengerId', 'Embarked')
newTestData = newTestData.drop('Cabin', 'Ticket', 'Embarked')
#newTrainData = newTrainData.na.drop()
#newTestData = newTestData.na.drop()
#newTrainData.printSchema()
#newTestData.count()

In [69]:
newTrainData.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
title,891,,,Dr.,Unknown.
Survived,891,0.3838383838383838,0.48659245426485753,0,1
Pclass,891,2.308641975308642,0.8360712409770491,1,3
Name,891,,,"""Andersson, Mr. August Edvard (""""Wennerstrom"""")""","van Melkebeke, Mr. Philemon"
Sex,891,,,female,male
Age,891,29.390202020202018,13.265321985344801,0.42,80.0
SibSp,891,0.5230078563411896,1.1027434322934315,0,8
Parch,891,0.38159371492704824,0.8060572211299488,0,6
Fare,891,32.2042079685746,49.69342859718089,0.0,512.3292


In [70]:
newTestData = newTestData.withColumn('Fare',
                                      when(col('Fare').isNull(),
                                          35.6271)
                                      .otherwise(col('Fare')))
newTestData.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
title,418,,,Dr.,Unknown.
PassengerId,418,1100.5,120.81045760473994,892,1309
Pclass,418,2.2655502392344498,0.8418375519640503,1,3
Name,418,,,"""Assaf Khalil, Mrs. Mariana (Miriam"""")""""""","van Billiard, Master. Walter John"
Sex,418,,,female,male
Age,418,29.75956937799043,13.033881536964937,0.17,76.0
SibSp,418,0.4473684210526316,0.8967595611217135,0,8
Parch,418,0.3923444976076555,0.9814288785371694,0,9
Fare,418,35.62718827751193,55.8405004795412,0.0,512.3292


In [71]:
assembler = VectorAssembler(inputCols=featureColumns, outputCol="features")
assembledTrainData = assembler.transform(newTrainData)
assembledTestData = assembler.transform(newTestData)
assembledTrainData.count()
assembledTestData.count()

418

In [72]:
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="Survived", outputCol="indexedLabel").fit(assembledTrainData)
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(assembledTrainData)

In [73]:
# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

In [74]:
#pipeline = Pipeline(stages=[dt])
#model = pipeline.fit(assembled)
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])
# Train model.  This also runs the indexers.
model = pipeline.fit(assembledTrainData)

In [75]:
# Make predictions.
predictions = model.transform(assembledTestData)

In [76]:
predictions.printSchema()
predictions.count(), submissionData.count()
predictions.select("PassengerId", "prediction").write.csv('mycsv.csv')

root
 |-- title: string (nullable = false)
 |-- PassengerId: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- SexInt: integer (nullable = false)
 |-- embarkedInt: integer (nullable = false)
 |-- median_age: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- indexedFeatures: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)



In [77]:
extractedPredictions = predictions.select("PassengerId", "prediction")
output = submissionData.join(extractedPredictions,['PassengerId'],"inner")
predictionAndLabels = output.select("prediction", "Survived")

In [78]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="Survived", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictionAndLabels)
print("Test accuracy = %g " % (accuracy))

treeModel = model.stages[2]

Test accuracy = 0.894737 
