In [1]:
%pylab inline

import numpy as np
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions
from pyspark.ml.feature import VectorAssembler

Populating the interactive namespace from numpy and matplotlib


In [2]:
 def prepare_data(data):
    ## Fill missing Age values with the average age per class

    explore_age_df = data.orderBy('Age', ascending=True)

    avg_age_df = explore_age_df.where(explore_age_df['Age'].isNotNull()).groupBy('Pclass').avg('Age')
    avg_age_df = avg_age_df.select('Pclass', avg_age_df['avg(Age)'].alias('Age'))

    avg_age_list = avg_age_df.collect()

    # Replace null values with the average age values from our passenger class list
    data_with_age_df = (data
                         .select('*', 
                                 when(data['Age'].isNull() & (data['Pclass'] == 1), 
                                      avg_age_list[0].Age)
                                 .otherwise(when(data['Age'].isNull() & (data['Pclass'] == 2), 
                                                 avg_age_list[1].Age)
                                            .otherwise(when(data['Age'].isNull() & (data['Pclass'] == 3), 
                                                            avg_age_list[2].Age)
                                                       .otherwise(col('Age')))).alias('FilledAge')))

    # Replace the Age column values with those from our FilledAge column and then drop FilledAge.
    data_with_age_df = data_with_age_df.withColumn('Age', data_with_age_df['FilledAge']).drop('FilledAge')

    ## Index Sex

    def sex_to_int(sex):
      if(sex.lower() == 'male'):
        return 0
      else:
        return 1
    sex_classify = functions.udf(sex_to_int, IntegerType())

    sex_int_df = data_with_age_df.select('*', sex_classify(data_with_age_df['Sex']).alias('IntSex'))
    data_sex_indexed_df = sex_int_df.withColumn('Sex', sex_int_df['IntSex']).drop('IntSex').cache()

    data_sex_indexed_df

    ## Index Cabin

    def cabin_to_int(cabin):
        if cabin:
            return ord(cabin[0])-ord('A')+1 #A:1; B:2; C:3; D:4; None:0
        else:
            return 0
    cabin_classify = functions.udf(cabin_to_int, IntegerType())

    cabin_int_df = data_sex_indexed_df.select('*', cabin_classify(data_sex_indexed_df['Cabin']).alias('IntCabin'))
    data_cabin_indexed_df = cabin_int_df.withColumn('Cabin', cabin_int_df['IntCabin']).drop('IntCabin').cache()

    data_cabin_indexed_df

    ##  Index Embarked

    def embarked_to_int(embarked):
        if embarked == 'C':   #Cherbourg; 
            return 1 #TBD
        elif embarked == 'Q': #Queenstown; 
            return 2
        elif embarked == 'S': #Southampton)
            return 3    
        else:
            return 0
    embarked_classify = functions.udf(embarked_to_int, IntegerType())

    embarked_int_df = data_cabin_indexed_df.select('*', embarked_classify(data_sex_indexed_df['Embarked']).alias('IntEmbarked'))
    data_embarked_indexed_df = embarked_int_df.withColumn('Embarked', embarked_int_df['IntEmbarked']).drop('IntEmbarked').cache()

    return data_embarked_indexed_df

## Intitialize Train data

In [3]:
train_data = (sqlContext
                 .read
                 .format('csv')
                 .options(header='true', inferSchema='true')
                 .load('./data/train.csv'))
train_data.cache()
train_data = prepare_data(train_data)
train_data.show(1)

+-----------+--------+------+--------------------+---+----+-----+-----+---------+----+-----+--------+
|PassengerId|Survived|Pclass|                Name|Sex| Age|SibSp|Parch|   Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+---+----+-----+-----+---------+----+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  0|22.0|    1|    0|A/5 21171|7.25|    0|       3|
+-----------+--------+------+--------------------+---+----+-----+-----+---------+----+-----+--------+
only showing top 1 row



## Assemble feature vectors

In [4]:
assembler = VectorAssembler(
    inputCols=['Pclass','Sex','Age','SibSp','Parch','Fare','Embarked'], outputCol='features'
)

train_data = assembler.transform(train_data).select('PassengerId',col('Survived').alias('label'),'features')
train_data.show(1,truncate=False)

+-----------+-----+-------------------------------+
|PassengerId|label|features                       |
+-----------+-----+-------------------------------+
|1          |0    |[3.0,0.0,22.0,1.0,0.0,7.25,3.0]|
+-----------+-----+-------------------------------+
only showing top 1 row



In [5]:
splits = train_data.randomSplit([0.8, 0.2])
train_train = splits[0].cache() #caching brings significant ~30% perofromance improvement to fitting
train_test = splits[1].cache()
train_train, train_test

(DataFrame[PassengerId: int, label: int, features: vector],
 DataFrame[PassengerId: int, label: int, features: vector])

### Benchmarking various classifiers

In [6]:
from pyspark.ml.classification import *
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

for classifier in (NaiveBayes, LogisticRegression, RandomForestClassifier, MultilayerPerceptronClassifier): # '[]' won't work
    
    if classifier != MultilayerPerceptronClassifier:
        model = classifier()
    else:
        #Number of inputs = the size of feature vectors. Number of outputs = the total number of labels.
        features_size = train_train.select("features").first()[0].size
        model = classifier(layers=[features_size,10,2]) 
    model_trained = model.fit(train_train)

    train_test_predicted = model_trained.transform(train_test)

    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction") 
    accuracy = evaluator.evaluate(train_test_predicted, {evaluator.metricName: "accuracy"}) # f1|weightedPrecision|weightedRecall|accuracy
    print(""+classifier.__name__.ljust(30) + '\t' + str(accuracy))
#     test_predicted.show(10)

#     print('Wrong predictions for error analysis')
#     test_predicted.filter(test_predicted['prediction'] != test_predicted['label']).show(5)

NaiveBayes                    	0.699453551913
LogisticRegression            	0.803278688525
RandomForestClassifier        	0.819672131148
MultilayerPerceptronClassifier	0.79781420765


## Working on Test data

In [7]:
test_data = (sqlContext
                 .read
                 .format('csv')
                 .options(header='true', inferSchema='true')
                 .load('./data/test.csv'))
test_data.cache()
test_data.show(5)

test_data = prepare_data(test_data)
test_data

+-----------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
|PassengerId|Pclass|                Name|   Sex| Age|SibSp|Parch| Ticket|   Fare|Cabin|Embarked|
+-----------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
|        892|     3|    Kelly, Mr. James|  male|34.5|    0|    0| 330911| 7.8292| null|       Q|
|        893|     3|Wilkes, Mrs. Jame...|female|47.0|    1|    0| 363272|    7.0| null|       S|
|        894|     2|Myles, Mr. Thomas...|  male|62.0|    0|    0| 240276| 9.6875| null|       Q|
|        895|     3|    Wirz, Mr. Albert|  male|27.0|    0|    0| 315154| 8.6625| null|       S|
|        896|     3|Hirvonen, Mrs. Al...|female|22.0|    1|    1|3101298|12.2875| null|       S|
+-----------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
only showing top 5 rows



DataFrame[PassengerId: int, Pclass: int, Name: string, Sex: int, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: int, Embarked: int]

In [8]:
test_assembler = VectorAssembler(
    inputCols=['Pclass','Sex','Age','SibSp','Parch','Fare','Embarked'], outputCol='features'
)

test_data = test_assembler.transform(test_data).select('PassengerId','features')
test_data.show(1,truncate=False)

+-----------+---------------------------------+
|PassengerId|features                         |
+-----------+---------------------------------+
|892        |[3.0,0.0,34.5,0.0,0.0,7.8292,2.0]|
+-----------+---------------------------------+
only showing top 1 row



### Proceed with Random Forest

In [9]:
rf_model = RandomForestClassifier()
rf_model_fitted = rf_model.fit(train_data)
rf_model_fitted

RandomForestClassificationModel (uid=rfc_4aa3cfd71a7a) with 20 trees

In [10]:
rf_test_predicted = rf_model_fitted.transform(test_data)
rf_test_predicted.show(1,truncate=False)

+-----------+---------------------------------+--------------------------------------+---------------------------------------+----------+
|PassengerId|features                         |rawPrediction                         |probability                            |prediction|
+-----------+---------------------------------+--------------------------------------+---------------------------------------+----------+
|892        |[3.0,0.0,34.5,0.0,0.0,7.8292,2.0]|[18.05041587222414,1.9495841277758608]|[0.902520793611207,0.09747920638879304]|0.0       |
+-----------+---------------------------------+--------------------------------------+---------------------------------------+----------+
only showing top 1 row



In [11]:
rf_test_predicted.select("PassengerId", col("prediction").alias('Survived')).write.csv("output",mode='overwrite',header=True)

### Cluster most probabable survivors

In [12]:
def extract_survival_prob(probability):
    return probability.values[1].item() 

extract_survival_prob_udf = functions.udf(extract_survival_prob, FloatType())

survival_prob_udf_df = rf_test_predicted.select('*', extract_survival_prob_udf(rf_test_predicted['probability']).alias('survival_prob_udf'))
# survival_prob_udf_df.show(1)

survival_prob_df = survival_prob_udf_df.withColumn('probability', survival_prob_udf_df['survival_prob_udf']).drop('survival_prob_udf').cache()
survival_prob_df.show(1)

+-----------+--------------------+--------------------+-----------+----------+
|PassengerId|            features|       rawPrediction|probability|prediction|
+-----------+--------------------+--------------------+-----------+----------+
|        892|[3.0,0.0,34.5,0.0...|[18.0504158722241...| 0.09747921|       0.0|
+-----------+--------------------+--------------------+-----------+----------+
only showing top 1 row



In [13]:
likely_survivors_df = survival_prob_df.filter("probability > 0.8")

likely_survivors_df=likely_survivors_df.drop('probability','rawPrediction','prediction')
likely_survivors_df.show(5)

+-----------+--------------------+
|PassengerId|            features|
+-----------+--------------------+
|        904|[1.0,1.0,23.0,1.0...|
|        906|[1.0,1.0,47.0,1.0...|
|        907|[2.0,1.0,24.0,1.0...|
|        914|[1.0,1.0,40.91836...|
|        916|[1.0,1.0,48.0,1.0...|
+-----------+--------------------+
only showing top 5 rows



In [14]:
from pyspark.ml.clustering import GaussianMixture

gmm = GaussianMixture().setK(2).setSeed(607262)
gmm_model = gmm.fit(likely_survivors_df)
print assembler.getInputCols()
gmm_model.gaussiansDF.select('mean').show(5, truncate=False)
# gmm_model.gaussiansDF.write.csv("output")

['Pclass', 'Sex', 'Age', 'SibSp', 'Parch', 'Fare', 'Embarked']
+---------------------------------------------------------------------------------------------------------------------------------------+
|mean                                                                                                                                   |
+---------------------------------------------------------------------------------------------------------------------------------------+
|[2.0882353113706267,0.8823531137782782,21.613472813187634,0.5294116726332617,0.8235293772121892,25.108456293919506,2.7647058363349806] |
|[1.0000001386082993,0.9999998614345726,41.204926539856274,0.5000000692945482,0.39583341708660036,109.44912135220906,1.8125001645715564]|
+---------------------------------------------------------------------------------------------------------------------------------------+



In [15]:
gmm_model.weights

[0.4146340652098827, 0.5853659347901172]

In [16]:
gmm_model.summary.clusterSizes

[34, 48]