# Titanic Survival Prediction
https://www.kaggle.com/c/titanic

##### Stephen Shepherd, 2020-07-21

I use Spark even though this is a small dataset so that I can use the code on larger datasets in the future if needed.

# Setup

In [3]:
import pyspark.sql.functions as F
import pyspark
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.feature import (VectorSizeHint, VectorAssembler)
from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.feature import VectorSlicer

import pandas as pd
import numpy as np
#import mlflow

In [4]:
%fs

ls FileStore/tables/Titanic

path,name,size
dbfs:/FileStore/tables/Titanic/gender_submission.csv,gender_submission.csv,3258
dbfs:/FileStore/tables/Titanic/test.csv,test.csv,28629
dbfs:/FileStore/tables/Titanic/train.csv,train.csv,61194


In [5]:
files = [f.path for f in dbutils.fs.ls('FileStore/tables/Titanic')]

print(files, end='\n\n')

for f in files:
  print(f)
  df = spark.read.option("header", True).option("inferSchema", True).csv(f)
  print(df.count())
  df.printSchema()
  print()

# Exploration

In [7]:
f = 'dbfs:/FileStore/tables/Titanic/train.csv'

train = (
  spark.read.option("header", True).option("inferSchema", True).csv('dbfs:/FileStore/tables/Titanic/train.csv')
  .withColumn("kaggle_test_train", F.lit("train"))
)

## Union in Kaggle Test set
train = (
  spark.read.option("header", True).option("inferSchema", True).csv('dbfs:/FileStore/tables/Titanic/test.csv')
  .withColumn("Survived", F.lit(None))
  .withColumn("kaggle_test_train", F.lit("test"))
  .select(train.columns)
  .unionAll(train)
  .orderBy(F.rand())
)

print(train.persist().count())

display(train.orderBy(F.rand()).limit(100))

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,kaggle_test_train
258,1.0,1,"Cherry, Miss. Gladys",female,30.0,0,0,110152,86.5,B77,S,train
1131,,1,"Douglas, Mrs. Walter Donald (Mahala Dutton)",female,48.0,1,0,PC 17761,106.425,C86,C,test
206,0.0,3,"Strom, Miss. Telma Matilda",female,2.0,0,1,347054,10.4625,G6,S,train
965,,1,"Ovies y Rodriguez, Mr. Servando",male,28.5,0,0,PC 17562,27.7208,D43,C,test
1303,,1,"Minahan, Mrs. William Edward (Lillian E Thorpe)",female,37.0,1,0,19928,90.0,C78,Q,test
1254,,2,"Ware, Mrs. John James (Florence Louise Long)",female,31.0,0,0,CA 31352,21.0,,S,test
395,1.0,3,"Sandstrom, Mrs. Hjalmar (Agnes Charlotta Bengtsson)",female,24.0,0,2,PP 9549,16.7,G6,S,train
3,1.0,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S,train
980,,3,"O'Donoghue, Ms. Bridget",female,,0,0,364856,7.75,,Q,test
706,0.0,2,"""Morley, Mr. Henry Samuel (""""Mr Henry Marshall"""")""",male,39.0,0,0,250655,26.0,,S,train


In [8]:
train.filter(F.col("kaggle_test_train") == 'train').describe().show()
train.filter(F.col("kaggle_test_train") == 'test').describe().show()

In [9]:
display(
  train
  .select([F.countDistinct(c).alias(c) for c in train.columns])
  .toPandas().transpose().reset_index()
  .sort_values(by=0, ascending=False)
)

index,0
PassengerId,1309
Name,1307
Ticket,929
Fare,281
Cabin,186
Age,98
Parch,8
SibSp,7
Pclass,3
Embarked,3


## Variation in target metric over dimensions

In [11]:
print(train.select(F.mean("survived")).collect()[0][0])

#train.groupBy("Survived").agg(F.count('*')).show()

In [12]:
display(
  train.filter(F.col("kaggle_test_train") == 'train')
  .groupBy("Sex")
  .agg(F.mean("Survived"))
)

Sex,avg(Survived)
female,0.7420382165605095
male,0.1889081455805892


In [13]:
attr = 'Pclass'

display(
  train.filter(F.col("kaggle_test_train") == 'train')
  .groupBy(F.round(F.col(attr), 0).alias(attr), 'Survived')
  .agg(F.count('*').alias("count"))
  .orderBy(attr,'Survived')
)

Pclass,Survived,count
1,0,80
1,1,136
2,0,97
2,1,87
3,0,372
3,1,119


In [14]:
attr = 'Fare'

display(
  train.filter(F.col("kaggle_test_train") == 'train')
  .groupBy(F.round(F.col(attr), -1).alias(attr), 'Survived')
  .agg(F.count('*').alias("count"))
  .orderBy(attr,'Survived')
)

Fare,Survived,count
0.0,0,15
0.0,1,1
10.0,0,328
10.0,1,113
20.0,0,54
20.0,1,46
30.0,0,76
30.0,1,59
40.0,0,16
40.0,1,12


In [15]:
attr = 'Age'

display(
  train.filter(F.col("kaggle_test_train") == 'train')
  .groupBy(F.round(F.col(attr), -1).alias(attr), 'Survived')
  .agg(F.count('*').alias("count"))
  .orderBy(attr,'Survived')
)

Age,Survived,count
,0,125
,1,52
0.0,0,13
0.0,1,27
10.0,0,20
10.0,1,18
20.0,0,127
20.0,1,73
30.0,0,123
30.0,1,78


## Extract some more features manually

In [17]:
## Various
train_0 = (
  train
  .withColumn("CabinLetter", F.substring(F.col("Cabin"), 1, 1))
  .withColumn("CabinNumber", F.regexp_extract(F.col("Cabin"), r'[0-9]+', 0))
  .withColumn("CabinEntries", F.size(F.split(F.col('Cabin'), ' ')))
  .withColumn("TicketNumber", F.regexp_extract(F.col("Ticket"), r'[0-9]{3,}', 0))
  .withColumn("TicketNumber", F.when(F.col("TicketNumber") == '', F.lit(None)).otherwise(F.col("TicketNumber")).cast("double"))
  .withColumn("TicketHasLetters", (F.length(F.regexp_extract(F.col("Ticket"), '[a-zA-Z]', 0)) > 0).cast('string'))
  .withColumn("LastName", F.split(F.regexp_replace(F.col('Name'), '''["()]''',''), ',').getItem(0))
  .withColumn("FirstName", F.split(F.split(F.regexp_replace(F.col('Name'), '''["()]''',''), '\.').getItem(1), ' ').getItem(1))
  .withColumn("NamePrefix", F.regexp_extract(F.col("Name"), r'\bMiss\.|\bMrs\.|\bMs\.|\bMr\.|\bMaster\.|\bDr\.|\bRev\.', 0))
  #.withColumn("SexAge", F.when(F.col("Sex") == 'female', F.lit(2)).otherwise(F.lit(1)) * ( (100 - F.col("Age")) / 100) )
  #.withColumn("SexFare", F.when(F.col("Sex") == 'female', F.lit(2)).otherwise(F.lit(1)) * (F.col("Fare") / 520))
)

## Cabin passengers
train_0 = (
  train_0
  .groupBy("Cabin")
  .agg(F.count("PassengerId").alias("CabinPassengers"))
  .join(train_0, on='Cabin', how="right")
  .fillna(-1, subset=['CabinPassengers'])
)

display(train_0.orderBy(F.rand()).limit(20))

Cabin,CabinPassengers,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Embarked,kaggle_test_train,CabinLetter,CabinNumber,CabinEntries,TicketNumber,TicketHasLetters,LastName,FirstName,NamePrefix
B73,1,521,1.0,1,"Perreault, Miss. Anne",female,30.0,0,0,12749,93.5,S,train,B,73.0,1,12749.0,False,Perreault,Anne,Miss.
,-1,1015,,3,"Carver, Mr. Alfred John",male,28.0,0,0,392095,7.25,S,test,,,-1,392095.0,False,Carver,Alfred,Mr.
B41,2,1289,,1,"Frolicher-Stehli, Mrs. Maxmillian (Margaretha Emerentia Stehli)",female,48.0,1,1,13567,79.2,C,test,B,41.0,1,13567.0,False,Frolicher-Stehli,Maxmillian,Mrs.
,-1,232,0.0,3,"Larsson, Mr. Bengt Edvin",male,29.0,0,0,347067,7.775,S,train,,,-1,347067.0,False,Larsson,Bengt,Mr.
,-1,989,,3,"Makinen, Mr. Kalle Edvard",male,29.0,0,0,STON/O 2. 3101268,7.925,S,test,,,-1,3101268.0,True,Makinen,Kalle,Mr.
,-1,544,1.0,2,"Beane, Mr. Edward",male,32.0,1,0,2908,26.0,S,train,,,-1,2908.0,False,Beane,Edward,Mr.
,-1,590,0.0,3,"Murdlin, Mr. Joseph",male,,0,0,A./5. 3235,8.05,S,train,,,-1,3235.0,True,Murdlin,Joseph,Mr.
,-1,655,0.0,3,"""Hegarty, Miss. Hanora """"Nora""""""",female,18.0,0,0,365226,6.75,Q,train,,,-1,365226.0,False,Hegarty,Hanora,Miss.
,-1,1249,,3,"Lockyer, Mr. Edward",male,,0,0,1222,7.8792,S,test,,,-1,1222.0,False,Lockyer,Edward,Mr.
,-1,595,0.0,2,"Chapman, Mr. John Henry",male,37.0,1,0,SC/AH 29037,26.0,S,train,,,-1,29037.0,True,Chapman,John,Mr.


In [18]:
attr = 'CabinLetter'

display(
  train_0.filter(F.col("kaggle_test_train") == 'train')
  .groupBy(attr, 'Survived')
  .agg(F.count('*').alias("count"))
  .orderBy(attr,'Survived')
)

CabinLetter,Survived,count
,0,481
,1,206
A,0,8
A,1,7
B,0,12
B,1,35
C,0,24
C,1,35
D,0,8
D,1,25


In [19]:
attr = 'NamePrefix'

display(
  train_0.filter(F.col("kaggle_test_train") == 'train')
  .groupBy(attr, 'Survived')
  .agg(F.count('*').alias("count"))
  .orderBy(F.desc("count"))
)

NamePrefix,Survived,count
Mr.,0,436
Miss.,1,127
Mrs.,1,99
Mr.,1,81
Miss.,0,55
Mrs.,0,26
Master.,1,23
Master.,0,17
,1,8
Rev.,0,6


In [20]:
display(train_0.filter(F.col("kaggle_test_train") == 'train').select([c[0] for c in train_0.dtypes if c[1] in ['int','double']]).fillna(-1))

PassengerId,Survived,Pclass,Age,SibSp,Parch,Fare,CabinEntries,TicketNumber
417,1,2,34.0,1,1,32.5,-1,28220.0
590,0,3,-1.0,0,0,8.05,-1,3235.0
329,1,3,31.0,1,1,20.525,-1,363291.0
828,1,2,1.0,0,2,37.0042,-1,2079.0
561,0,3,-1.0,0,0,7.75,-1,372622.0
566,0,3,24.0,2,0,24.15,-1,48871.0
481,0,3,9.0,5,2,46.9,-1,2144.0
422,0,3,21.0,0,0,7.7333,-1,13032.0
568,0,3,29.0,0,4,21.075,-1,349909.0
491,0,3,-1.0,1,0,19.9667,-1,65304.0


# Feature Preparation

In [22]:
project_name = 'titanic'
target_var = 'Survived'
id_col = 'PassengerId'
test_train_col = 'kaggle_test_train'

## Variables that might be numeric but we'd like to handle as categorical
categorical_assign = []
dont_use = []

features = {}
nominal = []; continuous = []

for C in train_0.dtypes:
  if C[0] not in [target_var, id_col, test_train_col] + categorical_assign + dont_use:
    if C[1] == 'string':
      nominal.append(C[0])
    if C[1] in ['int','double','float']:
      continuous.append(C[0])
    
features['nominal'] = nominal + categorical_assign
features['continuous'] = continuous

print(features['nominal'])
print(' ')
print(features['continuous'])

In [23]:
threshold = .01
maxcats = 30

train_1 = train_0.select("*")
print(train_1.persist().count())
F.broadcast(train_1)

removals = []

## Faster version using pandas for small data
arrays = train_1.filter(F.col(test_train_col) == 'test').fillna('171717.1717').select([F.collect_list(F.col(c)).alias(c) for c in features['nominal'][:]]).collect()
tcount = train_1.filter(F.col(test_train_col) == 'test').count()

for c in features['nominal'][:]:
  valcounts = pd.value_counts(arrays[0][c], dropna=False).sort_values(ascending=False)
  valcounts = valcounts[valcounts >= (threshold * tcount)]
  topvals = valcounts.head(maxcats).index.tolist()
  topvals = [s if s != '171717.1717' else None for s in topvals]
  
  if len(topvals) > 0:
    print(c.ljust(20) + ": ", "keeping " + str(len(topvals)) + " of " + str(len(set(arrays[0][c]))))
    train_1 = (
      train_1
      .withColumn(c, F.when(F.col(c).isin(topvals), F.col(c)).otherwise(F.lit('val_removed')))
    )
    
  else:
    print(c.ljust(20) + ": ", "removing, " + str(len(set(arrays[0][c]))) + " distinct")
    train_1 = train_1.drop(c)


## Spark version for bigger data
# for C in features['nominal'][:]:
  
#   ## using kaggle test set counts to avoid using values that aren't in the test set
#   tcount = train_1.filter(F.col(test_train_col) == 'test').count()
#   dvals = train_1.filter(F.col(test_train_col) == 'test').select(C).distinct().count()
  
#   topvals = [ s[0] for s in (
#     train_1
#     .filter(F.col(test_train_col) == 'test')
#     .groupBy(C)
#     .agg((F.count("*") / tcount).alias("pct"))
#     .filter(F.col("pct") > threshold)
#     .orderBy(F.desc("pct"))
#     .limit(maxcats)
#     .select(C)
#     .collect()
#   ) ]
  
#   if len(topvals) > 0:
#     print(C.ljust(20) + ": ", "keeping " + str(len(topvals)) + " of " + str(dvals))
#     train_1 = (
#       train_1
#       .withColumn(C, F.when(F.col(C).isin(topvals), F.col(C)).otherwise(F.lit('val_removed')))
#     )
    
#   else:
#     print(C.ljust(20) + ": ", "removing, " + str(dvals) + " distinct")
#     train_1 = train_1.drop(C)

In [24]:
for C in list(features.keys())[:]:
  for f in features[C][:]:
    if f not in train_1.columns:
      #print(f)
      features[C].remove(f)
      
train_1 = train_1.select([id_col, target_var, test_train_col] + features['nominal'] + features['continuous'])
      
print(features['nominal'])
print(' ')
print(features['continuous'])

In [25]:
train_2 = (
  train_1
  .fillna('null_val', subset=features['nominal'])
)

## Mean/Median imputing
mean_impute = Imputer(inputCols=features['continuous'], outputCols=[s + '_imp' for s in features['continuous']], strategy='median') ## impute missing with median
train_3 = mean_impute.fit(train_2).transform(train_2)

for c in features['continuous']:
  train_3 = train_3.drop(c)

train_3 = train_3.select([test_train_col,id_col,target_var] + features['nominal'] + [F.col(s + '_imp').alias(s) for s in features['continuous']])

#display(train_3.describe())
display(train_3.orderBy(F.rand()).limit(20))

kaggle_test_train,PassengerId,Survived,Cabin,Sex,Ticket,Embarked,CabinLetter,CabinNumber,TicketHasLetters,FirstName,NamePrefix,Pclass,Age,SibSp,Parch,Fare,CabinEntries,TicketNumber
train,8,0.0,val_removed,male,val_removed,S,val_removed,val_removed,False,val_removed,Master.,3,2.0,3,1,21.075,-1,349909.0
train,89,1.0,val_removed,female,val_removed,S,C,val_removed,False,val_removed,Miss.,1,23.0,3,2,263.0,3,19950.0
test,1103,,val_removed,male,val_removed,S,val_removed,val_removed,True,val_removed,Mr.,3,28.0,0,0,7.05,-1,3101308.0
test,1094,,val_removed,male,val_removed,C,C,val_removed,True,John,val_removed,1,47.0,1,0,227.525,2,17757.0
test,1301,,val_removed,female,val_removed,S,val_removed,val_removed,True,val_removed,Miss.,3,3.0,1,1,13.775,-1,3101315.0
test,933,,val_removed,male,val_removed,S,D,34,False,Thomas,Mr.,1,28.0,0,0,26.55,1,113778.0
train,684,0.0,val_removed,male,val_removed,S,val_removed,val_removed,True,Charles,Mr.,3,14.0,5,2,46.9,-1,2144.0
train,360,1.0,val_removed,female,val_removed,Q,val_removed,val_removed,False,val_removed,Miss.,3,28.0,0,0,7.8792,-1,330980.0
train,501,0.0,val_removed,male,val_removed,S,val_removed,val_removed,False,val_removed,Mr.,3,17.0,0,0,8.6625,-1,315086.0
train,776,0.0,val_removed,male,val_removed,S,val_removed,val_removed,False,val_removed,Mr.,3,18.0,0,0,7.75,-1,347078.0


In [26]:
train_3.write.mode("overwrite").saveAsTable(f"{project_name}_features")

# Model build

In [28]:
train_3 = spark.sql(f"SELECT * FROM {project_name}_features")

print(train_3.persist().count())

In [29]:
# train_3 = (
#   train_3
#   .join(spark.sql("SELECT *, 1 AS balanced FROM balanced_train_set"), on="PassengerId", how="left")
#   .filter( (F.col("kaggle_test_train") == 'test') | (F.col("balanced") == 1) )
#   .drop("balanced")
# )

# display(
#   train_3
#   .groupBy("kaggle_test_train")
#   .agg(F.count("*"))
# )

In [30]:
max_cat = 30

string_indexer = StringIndexer(inputCols=[c for c in features['nominal']], outputCols=[c + '_enc' for c in features['nominal']])
vector_assembler = VectorAssembler(inputCols=features['continuous'] + [s + '_enc' for s in features['nominal']], outputCol='features_1')
vector_indexer = VectorIndexer(inputCol="features_1", outputCol="features", maxCategories=max_cat)

pipe = Pipeline(stages= [string_indexer, vector_assembler, vector_indexer] )

train_4 = pipe.fit(train_3).transform(train_3).select(test_train_col, id_col, target_var, 'features').withColumnRenamed(target_var,'label')

display(train_4.persist().limit(3))

kaggle_test_train,PassengerId,label,features
train,74,0,"List(0, 16, List(0, 1, 2, 4, 5, 6, 10), List(2.0, 26.0, 1.0, 14.4542, 0.0, 2680.0, 1.0))"
train,64,0,"List(0, 16, List(0, 1, 2, 3, 4, 5, 6, 15), List(2.0, 4.0, 3.0, 2.0, 27.9, 0.0, 347088.0, 3.0))"
train,727,1,"List(0, 16, List(0, 1, 2, 4, 5, 6, 8, 15), List(1.0, 30.0, 3.0, 21.0, 0.0, 31027.0, 1.0, 2.0))"


In [31]:
train_4.write.mode("overwrite").saveAsTable(f"{project_name}_vectors")

In [32]:
project_name   = 'titanic'
target_var     = 'Survived'
id_col         = 'PassengerId'
test_train_col = 'kaggle_test_train'

features = {}
features['nominal'] = ['Cabin', 'Sex', 'Ticket', 'Embarked', 'CabinLetter', 'CabinNumber', 'TicketHasLetters', 'FirstName', 'NamePrefix']
 
features['continuous'] = ['Pclass', 'Age', 'SibSp', 'Parch', 'Fare', 'CabinEntries', 'TicketNumber'] #'SexAge', 'SexFare']

train_4 = spark.sql(f"SELECT * FROM {project_name}_vectors")

In [33]:
train_data, test_data = (
  train_4
  .filter(F.col("kaggle_test_train") == 'train')
  #.orderBy(F.rand())
  .randomSplit([.7,.3], seed=np.random.randint(100))
)

## Splitting based on PassengerId instead of random as that's how Kaggle's split seems to be done
# train_data = train_4.filter(F.col("kaggle_test_train") == 'train').filter(F.col("PassengerId") <= 623)
# test_data  = train_4.filter(F.col("kaggle_test_train") == 'train').filter(F.col("PassengerId") > 623)

train_data = train_data.withColumn("test_train", F.lit("train"))
test_data  = test_data.withColumn("test_train", F.lit("test"))

full_data = train_data.unionAll(test_data)
full_data.persist()

display(
  full_data
  .groupBy("kaggle_test_train","test_train")
  .agg(F.count("*"), F.countDistinct(id_col), F.sum("label"), F.round(F.mean('label'), 3))
)

kaggle_test_train,test_train,count(1),count(PassengerId),sum(label),"round(avg(label), 3)"
train,test,270,270,102,0.378
train,train,621,621,240,0.386


In [34]:
removals = []
# keepers = ['Sex_enc','Embarked_enc','Fare']
keepers = [s for s in [c + '_enc' for c in features['nominal']] + features['continuous'] if s not in removals]
print(keepers)

slicer = VectorSlicer(inputCol="features_in", outputCol="features", names=keepers)

full_data = slicer.transform(full_data.withColumnRenamed("features","features_in")).drop("features_in")
train_4 = slicer.transform(train_4.withColumnRenamed("features","features_in")).drop("features_in")

#display(full_data.limit(3))

In [35]:
estimator = RandomForestClassifier(labelCol='label', featuresCol='features')

## CV Hyperparameter search
paramGrid = (
  ParamGridBuilder()
  .addGrid(estimator.maxDepth,               [5,8]      )
  .addGrid(estimator.numTrees,               [100]      )
  .addGrid(estimator.minInstancesPerNode,    [1,10]     )
  .addGrid(estimator.featureSubsetStrategy , ['sqrt']   )
  .addGrid(estimator.subsamplingRate,        [.3,.5,1]  )
  .build()
)

crossval = CrossValidator(
  estimator=estimator,
  estimatorParamMaps=paramGrid,
  evaluator=BinaryClassificationEvaluator(),
  numFolds=3
)

## Returns model with best performance
cvModel = crossval.fit(full_data.filter(F.col("test_train") == 'train'))

# estimator = RandomForestClassifier(
#   labelCol='label',
#   featuresCol='features',
#   maxDepth=                7,
#   numTrees=                200,
#   minInstancesPerNode=     1,
#   featureSubsetStrategy=   '3',
#   subsamplingRate=         .2
# )

# cvModel = estimator.fit(full_data.filter(F.col("test_train") == 'train'))

predictions = cvModel.transform(full_data)
print(predictions.persist().count())

print("numTrees  maxDepth  minInstances featureSubsetStrategy subSamplingRate")
print(cvModel.bestModel.getNumTrees, cvModel.bestModel.getMaxDepth(), cvModel.bestModel.getMinInstancesPerNode(), cvModel.bestModel.getFeatureSubsetStrategy(), cvModel.bestModel.getSubsamplingRate())
#print(cvModel.getNumTrees, cvModel.getMaxDepth(), cvModel.getMinInstancesPerNode(), cvModel.getFeatureSubsetStrategy(), cvModel.getSubsamplingRate())
print(" ")
print("Train auROC: ")
print(np.round(BinaryClassificationEvaluator(rawPredictionCol="rawPrediction").evaluate(predictions.filter(F.col("test_train") == 'train')), 3))
print("Test auROC: ")
print(np.round(BinaryClassificationEvaluator(rawPredictionCol="rawPrediction").evaluate(predictions.filter(F.col("test_train") == 'test')), 3))
print(" ")

def get_accuracy(preds):
  tcount = preds.count()
  eql = preds.filter(F.col("label") == F.col("prediction")).count()
  return np.round(eql / tcount, 3)

print("Train Accuracy: ")
print(get_accuracy(predictions.filter(F.col("test_train") == 'train')))
print("Test Accuracy: ")
print(get_accuracy(predictions.filter(F.col("test_train") == 'test')))

##### Appears to still be overfitting, need to do more work on this.

Perhaps many of the features are correlated (eg. NamePrefix and Sex) and that is taking some of the 'random' out of the 'forest'

## Feature Insights

In [38]:
def get_feature_importances(training_data, model, featuresCol):
      "Returns feature importance values for the passed dataset and model"
      
      feature_metadata = training_data.select(featuresCol).schema[0].metadata['ml_attr']['attrs']

      attribute_idx_title = {}
      for i in feature_metadata.keys():
        metadata = feature_metadata[i]
        for j in metadata:
          attribute_idx_title[j["idx"]] = j["name"]

      feature_names = []; feature_indices = []; importances = []
      for K in list(model.featureImportances.indices):
        feature_indices.append(K)
        feature_names.append(attribute_idx_title[int(K)])
        importances.append(model.featureImportances[int(K)])
        
      max_importance = max(importances)
    
      feat_importances_table = spark.createDataFrame(
        pd.DataFrame(
        data={"feature":[f.replace("_enc","") for f in feature_names],
              "importance": [v for v in importances],
              "relative_importance": [np.round(v / float(max_importance), 5) for v in importances],
              "idx": feature_indices
             }
        )
      ).orderBy("relative_importance", ascending=False)
      
      return feat_importances_table

In [39]:
feat_imps = get_feature_importances(full_data, cvModel.bestModel, "features").select("feature","relative_importance")
#feat_imps = get_feature_importances(full_data, cvModel, "features").select("feature","relative_importance")

display(feat_imps)

feature,relative_importance
Sex,1.0
NamePrefix,0.65687
Fare,0.31553
Age,0.22659
TicketNumber,0.21295
Pclass,0.17768
CabinLetter,0.16501
CabinEntries,0.13705
SibSp,0.13622
Parch,0.1193


# Predict on Kaggle Test Set

In [41]:
# final_model = RandomForestClassifier(
#   labelCol='label',
#   featuresCol='features',
#   numTrees=cvModel.bestModel.getNumTrees,
#   maxDepth=cvModel.bestModel.getMaxDepth(),
#   minInstancesPerNode=cvModel.bestModel.getMinInstancesPerNode(),
#   featureSubsetStrategy=cvModel.bestModel.getFeatureSubsetStrategy()
# )

final_model = estimator

kaggle_test = train_4.filter(F.col("kaggle_test_train") == 'test')

final_predictions = final_model.fit(full_data).transform(kaggle_test)
#final_predictions = cvModel.transform(kaggle_test)

print(final_predictions.persist().count())

display(final_predictions.select(id_col,F.col("prediction").alias(target_var)).orderBy(id_col))

PassengerId,Survived
892,0.0
893,0.0
894,0.0
895,0.0
896,1.0
897,0.0
898,1.0
899,0.0
900,1.0
901,0.0


# End