In [1]:
import pandas as pd
import pyspark
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder, VectorIndexer
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, struct, lit, coalesce, monotonically_increasing_id, row_number
from pyspark.sql.types import *
from pyspark.sql.window import Window
import numpy as np
import os

Import custom jars for xgboost pyspark functionality.
sparkxgb is an open source wrapper for sparkxgb in Python. It is outdated. I have manually removed references to deprecated pyspark classes

In [None]:
spark = SparkSession\
        .builder\
        .getOrCreate()

df = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("train.csv")

Examine the volume of missing data

In [None]:
missing = [(col, df.where(df[col].isNull()).count()) for col in df.columns]
total_rows = df.count()
print(f"Total samples: {total_rows}")
for col, num_missing in missing:
    if num_missing > 0:
        print(f"Column: {col}, num missing: {num_missing} (% missing: {num_missing/total_rows*100:.1f}%)")

Over 3/4 of Cabin data is missing (687/891). There are a total 148 unique values, so 204 of the non-missing samples are unique. There may be useful information here still (e.g. cabin area. i.e. treating A23 and A12 as equivalent). But for now we will omit the Cabin as a feature rather than imputing or giving XGB free reign. 

Embarked has a small amount missing. We can set these to 0 for the sake of one-hot encoding, but will have minimal effect on data. For age, a recursive tree model will be used to predict the remaining values (alternatively, simple mean could be used).

In [None]:
df.dtypes
# Sex, Cabin, and Embarked must be indexed and potentially encoded
initial_features = ["Pclass", "Sex", "Age", "SibSp", "Parch", "Fare", "Cabin", "Embarked"]
target = "Survived"

In [None]:
# num_distinct = [(col, df.select(col).distinct().count()) for col in df.columns]
# for col, num in num_distinct:
#     print(f"Column: {col}, unique value count: {num}")

Let's keep columns as features if we have an a priori reason to believe they would act as strong predictors. These include:
Pclass, Age, Sex, SibSp, Parch, Fare, Cabin, Embarked

Name, PassengerId, and Ticket are omitted as well due to being largely unique values without much discernible useful information (distinct calc above commented because it is very slow)

Replace missing values with 0. Since we are doing so with categorical variables (for which 0 is not a value in the set), in essense, we are creating a new category. 

In [None]:
#encoder = OneHotEncoder(inputCols=["s1"], outputCols=["catVec"])
features_for_encoding = ["Sex", "Cabin", "Embarked"]
df2 = df
for col in features_for_encoding:
    df2 = df2.withColumn(col, when(df2[col].isNull(), 0).otherwise(df2[col]))

Create the transformation pipeline then keep the transformer for use later on the test set

In [None]:
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index") for col in features_for_encoding]
inputs = [indexer.getOutputCol() for indexer in indexers]
encoder = OneHotEncoder(inputCols=inputs, outputCols=[col+"_enc" for col in features_for_encoding])
stages = indexers + [encoder]
pipeline = Pipeline(stages=stages)

Below we transform the features to one-hot, vectorise them, then train and predict the basic decision tree classifier. The decision tree classifier really only has to be "good enough"

In [None]:
feature_transformer = pipeline.fit(df2)
df3 = feature_transformer.transform(df2)
feature_cols = ['Pclass',
                 'Age',
                 'SibSp',
                 'Parch',
                 'Fare',
                 'Sex_enc',
                 'Cabin_enc',
                 'Embarked_enc']
age_predictors = feature_cols.copy()
age_predictors.remove("Age")
vectoriser = VectorAssembler().setInputCols(age_predictors).setOutputCol("Age_Predictors")
age_vectoriser = Pipeline(stages=[vectoriser])
df3 = age_vectoriser.fit(df3).transform(df3)
dt = DecisionTreeRegressor(featuresCol="Age_Predictors", labelCol="Age", maxDepth=5)
df3_filtered = df3.filter(df3["Age"].isNotNull())
dt_model = dt.fit(df3_filtered)
df3 = dt_model.transform(df3)


df3 = df3.withColumn("age_predAge", struct(df3["Age"], df3["prediction"]))\
        .withColumn("meanAge", lit(29.7))
df3 = df3.withColumn("age_meanAge", struct(df3["Age"], df3["meanAge"]))

train_acc_df = df3.filter(df3.Age.isNotNull()).select("age_predAge", "age_meanAge")
age_pred_rdd = train_acc_df.select("age_predAge").rdd.flatMap(lambda x:x)
age_mean_rdd = train_acc_df.select("age_meanAge").rdd.flatMap(lambda x:x)

age_pred_metrics = RegressionMetrics(age_pred_rdd)
age_mean_metrics = RegressionMetrics(age_mean_rdd)

print(f"MSE using mean: {age_mean_metrics.rootMeanSquaredError:.2f}")
print(f"MSE using prediction: {age_pred_metrics.rootMeanSquaredError:.2f}")

Now we have removed null values from categorical columns and made predictions for missing numerical values. All of the above has become a bit gross, largely due to creating duplicate DataFrames (for ease of debugging). Time to clean things up.

In [None]:
df_final = df3.select(age_predictors)
age = df3.select(coalesce(df3.Age, df3.prediction))
age = age.withColumnRenamed("coalesce(Age, prediction)", "Age")
df_final = df_final.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
age = age.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
df_final = df_final.join(age, on=["row_index"]).drop("row_index")
df_final.show()

In [None]:
features_cols = df_final.columns
vectoriser = VectorAssembler().setInputCols(feature_cols).setOutputCol("features")
age_vectoriser = Pipeline(stages=[vectoriser])
df_final = age_vectoriser.fit(df_final).transform(df_final).select("features")
df_final = df_final.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
df = df.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).select(["row_index", "Survived"])
df_final = df_final.join(df, on=["row_index"]).drop("row_index")
df_final.show()