In [1]:
import os
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, SparkSession, Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
from pyspark.ml.wrapper import JavaEstimator, JavaModel
from pyspark.ml.param.shared import HasFeaturesCol, HasLabelCol
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, VectorIndexer, Bucketizer
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import sklearn
import xgboost as xgb
# from xgboost import XGBClassifier
import sklearn

In [2]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /Users/stirlingwaite/Projects/personal/utah-data-engineering-pyspark/jars/xgboost4j-spark-0.72.jar,/Users/stirlingwaite/Projects/personal/utah-data-engineering-pyspark/jars/xgboost4j-0.72.jar pyspark-shell'

conf = SparkConf()\
                .setMaster("local[2]")\
                .setAppName("xgbooster")\
                .set("spark.executor.memory", "6g")\
                .set("spark.driver.memory", "6g") 
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
sc.addPyFile("/Users/stirlingwaite/Projects/personal/utah-data-engineering-pyspark/jars/sparkxgb.zip")
sqlContext = SQLContext(sc)
spark = SparkSession.builder.appName("spark play").getOrCreate()

In [3]:
# gender_df = spark.read.csv("file:///Users/stirlingwaite/Projects/personal/utah-data-engineering-pyspark/data/gender_submission.csv", header=True, mode="DROPMALFORMED", inferSchema='true', encoding="utf-8").persist()
training_df = spark.read.csv("file:///Users/stirlingwaite/Projects/personal/utah-data-engineering-pyspark/data/train.csv", header=True, mode="DROPMALFORMED", inferSchema='true', encoding="utf-8").persist()
testing_df = spark.read.csv("file:///Users/stirlingwaite/Projects/personal/utah-data-engineering-pyspark/data/test.csv", header=True, mode="DROPMALFORMED", inferSchema='true', encoding="utf-8").persist()

training_df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [4]:
# // Feature Engineering Step:
# // Surname Regex: Pull last name from the Name record
# // Honorific Regex: Pull title from the Name column.  Create a column per title.
# // Family Size: Add the SibSp and Parch columns, and a 1 for self
# // Family Buckets: Singleton = 1, SmallFam = 2 to 4, LargeFam > 5
# // Child feature: Is the person 18 or under
# // Mother feature: 16 or older, female, Honorific other than "Miss", and Parch greater than 0
    
training_features = (training_df
                    .withColumn("Surname", regexp_extract(col("Name"),"([\\w ']+),",1))
                    .withColumn("Honorific", regexp_extract(col("Name"),"(.*?)([\\w]+?)[.]",2))
                    .withColumn("Mil", when((col("Honorific") == "Col") | (col("Honorific") == "Major") | (col("Honorific") == "Capt"), 1).otherwise(0))
                    .withColumn("Doc", when(col("Honorific") == "Dr", 1).otherwise(0))
                    .withColumn("Rev", when(col("Honorific") == "Rev", 1).otherwise(0))
                    .withColumn("Nob", when((col("Honorific") == "Sir") |
                        (col("Honorific") == "Countess") |
                        (col("Honorific") == "Count") |
                        (col("Honorific") == "Duke") |
                        (col("Honorific") == "Duchess") |
                        (col("Honorific") == "Jonkheer") |
                        (col("Honorific") == "Don") |
                        (col("Honorific") == "Dona") |
                        (col("Honorific") == "Lord") |
                        (col("Honorific") == "Lady") |
                        (col("Honorific") == "Earl") |
                        (col("Honorific") == "Baron"), 1).otherwise(0))
                    .withColumn("Mr", when(col("Honorific") == "Mr", 1).otherwise(0))
                    .withColumn("Mrs", when((col("Honorific") == "Mrs") | (col("Honorific") == "Mme"), 1).otherwise(0))
                    .withColumn("Miss", when((col("Honorific") == "Miss") | (col("Honorific") == "Mlle"), 1).otherwise(0))
                    .withColumn("Mstr", when(col("Honorific") == "Master", 1).otherwise(0))
                    .withColumn("TotalFamSize", col("SibSp") + col("Parch") + 1)
                    .withColumn("Singleton", when(col("TotalFamSize") == 1, 1).otherwise(0))
                    .withColumn("SmallFam", when((col("TotalFamSize") <= 4) & (col("TotalFamSize") > 1), 1).otherwise(0))
                    .withColumn("LargeFam", when(col("TotalFamSize") >= 5, 1).otherwise(0))
                    .withColumn("Child", when((col("Age") <= 18), 1).otherwise(0))
                    .withColumn("Mother", when((col("Age") > 15) &
                        (col("Parch") > 0) & 
                        (col("Miss") == 0) & 
                        (col("Sex") == "female"),1).otherwise(0)))
training_features.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------------+---------+---+---+---+---+---+---+----+----+------------+---------+--------+--------+-----+------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|      Surname|Honorific|Mil|Doc|Rev|Nob| Mr|Mrs|Miss|Mstr|TotalFamSize|Singleton|SmallFam|LargeFam|Child|Mother|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------------+---------+---+---+---+---+---+---+----+----+------------+---------+--------+--------+-----+------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|       Braund|       Mr|  0|  0|  0|  0|  1|  0|   0|   0|           2|        0|       1|       0|    0|     0|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|7

In [5]:
# // Explore the data
(training_features.groupBy("Pclass","Embarked")
                  .agg(
                      count("*").alias("count"),
                      avg("Fare"),
                      min("Fare"),
                      max("Fare"),
                      stddev("Fare")
                  )
                  .orderBy("Pclass","Embarked")
                  .show())

training_features.createOrReplaceTempView("training_features")

+------+--------+-----+------------------+---------+---------+------------------+
|Pclass|Embarked|count|         avg(Fare)|min(Fare)|max(Fare)| stddev_samp(Fare)|
+------+--------+-----+------------------+---------+---------+------------------+
|     1|    null|    2|              80.0|     80.0|     80.0|               0.0|
|     1|       C|   85|104.71852941176469|    26.55| 512.3292|  99.0939349696501|
|     1|       Q|    2|              90.0|     90.0|     90.0|               0.0|
|     1|       S|  127| 70.36486220472443|      0.0|    263.0|58.811277761795566|
|     2|       C|   17|25.358335294117644|     12.0|  41.5792|11.345067090697457|
|     2|       Q|    3|             12.35|    12.35|    12.35|               0.0|
|     2|       S|  164|20.327439024390245|      0.0|     73.5|13.630741099088103|
|     3|       C|   66|11.214083333333337|   4.0125|  22.3583| 4.871528353625736|
|     3|       Q|   72|11.183393055555557|     6.75|   29.125| 6.721676511682005|
|     3|       S

In [6]:
spark.sql("""
          SELECT 
              Pclass,
              Embarked,
              percentile_approx(Fare, 0.5) AS Median_Fare 
          FROM training_features 
          WHERE Fare IS NOT NULL 
          AND Pclass = 1 
          GROUP BY Pclass, Embarked
          """).show()

+------+--------+-----------+
|Pclass|Embarked|Median_Fare|
+------+--------+-----------+
|     1|    null|       80.0|
|     1|       Q|       90.0|
|     1|       C|    78.2667|
|     1|       S|       52.0|
+------+--------+-----------+



In [7]:
# // Impute Embarked column
# // From the discovery above, the likely port is C, since the Median for C is closest to 80.
train_embarked = training_features.na.fill("C", ["Embarked"])
print(train_embarked.show())

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------------+---------+---+---+---+---+---+---+----+----+------------+---------+--------+--------+-----+------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|      Surname|Honorific|Mil|Doc|Rev|Nob| Mr|Mrs|Miss|Mstr|TotalFamSize|Singleton|SmallFam|LargeFam|Child|Mother|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------------+---------+---+---+---+---+---+---+----+----+------------+---------+--------+--------+-----+------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|       Braund|       Mr|  0|  0|  0|  0|  1|  0|   0|   0|           2|        0|       1|       0|    0|     0|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|7

In [8]:
# // Perform discovery on missing data in Age column
# // Create the temp table view so we can perform spark.sql queries on the dataframe
train_embarked.createOrReplaceTempView("train_embarked")

# // Explore the data
# // Count nulls for each Honorific.  Some titles can imply age (miss,master,etc)
spark.sql("SELECT Honorific,count(*) as nullAge FROM train_embarked WHERE Age IS NULL GROUP BY Honorific").show()

+---------+-------+
|Honorific|nullAge|
+---------+-------+
|     Miss|     36|
|   Master|      4|
|       Mr|    119|
|       Dr|      1|
|      Mrs|     17|
+---------+-------+



In [9]:
# // Calculate the average age for the Honorific titles that have nulls
spark.sql("SELECT Honorific,round(avg(Age)) as avgAge FROM train_embarked WHERE Age IS NOT NULL AND Honorific IN ('Miss','Master','Mr','Dr','Mrs') GROUP BY Honorific").show()

+---------+------+
|Honorific|avgAge|
+---------+------+
|     Miss|  22.0|
|   Master|   5.0|
|       Mr|  32.0|
|       Dr|  42.0|
|      Mrs|  36.0|
+---------+------+



In [10]:
# // Impute the missing Age values for the relevant Honorific columns and union the data back together
train_miss_df = train_embarked.na.fill(22.0).where("Honorific = 'Miss'")
train_master_df = train_embarked.na.fill(5.0).where("Honorific = 'Master'")
train_mr_df = train_embarked.na.fill(32.0).where("Honorific = 'Mr'")
train_dr_df = train_embarked.na.fill(42.0).where("Honorific = 'Dr'")
train_mrs_df = train_embarked.na.fill(36.0).where("Honorific = 'Mrs'")
train_remainder_df = spark.sql("SELECT * FROM train_embarked WHERE Honorific NOT IN ('Miss','Master','Dr','Mr','Mrs')")
train_combined_df = train_remainder_df.union(train_miss_df).union(train_master_df).union(train_mr_df).union(train_dr_df).union(train_mrs_df)

In [11]:
# // Convert the categorical (string) values into numeric values
# // Convert the categorical (string) values into numeric values
gender_indexer = StringIndexer(inputCol="Sex", outputCol="SexIndex").setHandleInvalid("keep")
embark_indexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkIndex").setHandleInvalid("keep")

In [12]:
# // Convert the numerical index columns into One Hot columns
# // The One Hot columns are binary {0,1} values of the categories
gender_encoder = OneHotEncoder(dropLast=False, inputCol="SexIndex", outputCol="SexVec")
embark_encoder = OneHotEncoder(dropLast=False, inputCol="EmbarkIndex", outputCol="EmbarkVec")

In [13]:
# // Create 8 buckets for the fares, turning a continuous feature into a discrete range# // Cre 
fare_splits = [0.0,10.0,20.0,30.0,40.0,60.0,120.0, float("+inf")]
fare_bucketize = Bucketizer().setInputCol("Fare").setOutputCol("FareBucketed").setSplits(fare_splits)

In [14]:
# // Create a vector of the features.  
assembler = VectorAssembler().setInputCols(["Pclass", "SexVec", "Age", "SibSp", "Parch", "Fare", "FareBucketed", "EmbarkVec", "Mil", "Doc", "Rev", "Nob", "Mr", "Mrs", "Miss", "Mstr", "TotalFamSize", "Singleton", "SmallFam", "LargeFam", "Child", "Mother"]).setOutputCol("features")

In [15]:
# // Create the features pipeline and data frame
# // The order is important here, Indexers have to come before the encoders
training_features_pipeline = (Pipeline().setStages([gender_indexer, embark_indexer, gender_encoder, embark_encoder, fare_bucketize, assembler]))
training_features_df = training_features_pipeline.fit(train_combined_df).transform(train_combined_df)

In [16]:
# // Now that the data has been prepared, let's split the dataset into a training and test dataframe
train_df, test_df = training_features_df.randomSplit([0.8, 0.2], seed = 12345)
train_df.show(2)

+-----------+--------+------+--------------------+----+----+-----+-----+--------+-------+-----+--------+---------+---------+---+---+---+---+---+---+----+----+------------+---------+--------+--------+-----+------+--------+-----------+-------------+-------------+------------+--------------------+
|PassengerId|Survived|Pclass|                Name| Sex| Age|SibSp|Parch|  Ticket|   Fare|Cabin|Embarked|  Surname|Honorific|Mil|Doc|Rev|Nob| Mr|Mrs|Miss|Mstr|TotalFamSize|Singleton|SmallFam|LargeFam|Child|Mother|SexIndex|EmbarkIndex|       SexVec|    EmbarkVec|FareBucketed|            features|
+-----------+--------+------+--------------------+----+----+-----+-----+--------+-------+-----+--------+---------+---------+---+---+---+---+---+---+----+----+------------+---------+--------+--------+-----+------+--------+-----------+-------------+-------------+------------+--------------------+
|         31|       0|     1|Uruchurtu, Don. M...|male|40.0|    0|    0|PC 17601|27.7208| null|       C|Uruchurt

# Data Engineers Job Is Done

# Data Scientist Job Starts

## Predict Survivors using XGBoost Model

In [17]:
class XGBoostEstimator(JavaEstimator, HasFeaturesCol, HasLabelCol):
    def __init__(self, xgb_param_map={}):
        super(XGBoostEstimator, self).__init__()
        sc = SparkContext._active_spark_context
        scala_map = sc._jvm.PythonUtils.toScalaMap(xgb_param_map)
        self._defaultParamMap = xgb_param_map
        self._paramMap = xgb_param_map
        self._from_XGBParamMap_to_params()
        self._java_obj = self._new_java_obj(
            "ml.dmlc.xgboost4j.scala.spark.XGBoostEstimator", self.uid, scala_map)

    def _create_model(self, java_model):
        return JavaModel(java_model)

    def _from_XGBParamMap_to_params(self):
        for param, value in self._paramMap.items():
            setattr(self, param, value)

In [18]:
params = {}
params["eta"] = 0.1
params["max_depth"] = 8
params["gamma"] = 0.0
params["colsample_bylevel"] = 1
params["objective"] = "binary:logistic"
params["num_class"] = 2
params["booster"] = "gbtree"
params["num_rounds"] = 20
params["nWorkers"] = 3

In [19]:
# // Create an XGBoost Classifier
xgbEstimator = XGBoostEstimator(params)\
                            .setFeaturesCol("features")\
                            .setLabelCol("Survived")

In [20]:
# // XGBoost paramater grid
xgbParamGrid = ParamGridBuilder() \
        .addGrid(xgbEstimator.max_depth, [16]) \
        .addGrid(xgbEstimator.eta, [0.015]) \
        .build()

In [21]:
# // Create the XGBoost pipeline
pipeline = Pipeline().setStages([xgbEstimator])

In [22]:
# // Setup the binary classifier evaluator
evaluator = BinaryClassificationEvaluator().setLabelCol("Survived")\
                                              .setRawPredictionCol("prediction")\
                                              .setMetricName("areaUnderROC")

In [23]:
cv = CrossValidator().setEstimator(pipeline) \
                      .setEvaluator(evaluator)\
                      .setEstimatorParamMaps(xgbParamGrid) \
                      .setNumFolds(10)

In [24]:
xgb_model = cv.fit(train_df)

In [25]:
results = xgb_model.transform(test_df)

In [26]:
results.createOrReplaceTempView("results")
spark.sql("SELECT PassengerID as PID,Pclass,Sex,Age,SibSp,Parch,Honorific as Hon,TotalFamSize as Fam,Survived,prediction,probabilities FROM results where Survived != cast(prediction as int)").show(100)

+---+------+------+----+-----+-----+--------+---+--------+----------+--------------------+
|PID|Pclass|   Sex| Age|SibSp|Parch|     Hon|Fam|Survived|prediction|       probabilities|
+---+------+------+----+-----+-----+--------+---+--------+----------+--------------------+
|537|     1|  male|45.0|    0|    0|   Major|  1|       0|       1.0|[0.45705056190490...|
|823|     1|  male|38.0|    0|    0|Jonkheer|  1|       0|       1.0|[0.45705056190490...|
| 69|     3|female|17.0|    4|    2|    Miss|  7|       1|       0.0|[0.53711110353469...|
|193|     3|female|19.0|    1|    0|    Miss|  2|       1|       0.0|           [0.5,0.5]|
|217|     3|female|27.0|    0|    0|    Miss|  1|       1|       0.0|           [0.5,0.5]|
|358|     2|female|38.0|    0|    0|    Miss|  1|       0|       1.0|[0.45705056190490...|
|594|     3|female|22.0|    0|    2|    Miss|  3|       0|       1.0|[0.47225075960159...|
|655|     3|female|18.0|    0|    0|    Miss|  1|       0|       1.0|[0.47225075960159...|

In [27]:
# // What was the overall accuracy of the model, using AUC
evaluator.evaluate(results)

0.8365709459459459