In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit, monotonically_increasing_id

In [2]:
spark_session = SparkSession\
                .builder\
                .master("local")\
                .appName("Titanic Classification")\
                .getOrCreate()

In [3]:
train = spark_session.read.csv(path = 'train.csv', header = True, inferSchema = True)
test = spark_session.read.csv(path = 'test.csv', header = True, inferSchema = True)

In [4]:
train.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

In [5]:
train.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



<h2> Data Wrangling

<b> Cabin column has 77% NA values. Drop <br>
Age column has 20% NA values. Try to impute

In [6]:
for name in train.schema.names:
    na_count = train.filter(train[name].isNull()).count()
    na_perc = round(100 * na_count / train.count(), 2)
    print(name + ": " + str(na_count) + " " + str(na_perc))

PassengerId: 0 0.0
Survived: 0 0.0
Pclass: 0 0.0
Name: 0 0.0
Sex: 0 0.0
Age: 177 19.87
SibSp: 0 0.0
Parch: 0 0.0
Ticket: 0 0.0
Fare: 0 0.0
Cabin: 687 77.1
Embarked: 2 0.22


In [7]:
train = train.drop('Cabin')
test = test.drop('Cabin')

In [8]:
train = train.dropna(subset = ['Embarked'])

In [9]:
train = train.withColumn("Initial",regexp_extract(col("Name"),"([A-Za-z]+)\.",1))
train.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+--------+-------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Embarked|Initial|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+--------+-------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25|       S|     Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|       C|    Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925|       S|   Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1|       S|    Mrs|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05|       S|     Mr|
+-----------+--------+------+--------------------+------+----+--

In [10]:
grpby_initial = train.groupBy('Initial').count()
grpby_initial.show()

+--------+-----+
| Initial|count|
+--------+-----+
|     Don|    1|
|    Miss|  181|
|Countess|    1|
|     Col|    2|
|     Rev|    6|
|    Lady|    1|
|  Master|   40|
|     Mme|    1|
|    Capt|    1|
|      Mr|  517|
|      Dr|    7|
|     Mrs|  124|
|     Sir|    1|
|Jonkheer|    1|
|    Mlle|    2|
|   Major|    2|
|      Ms|    1|
+--------+-----+



In [11]:
train = train.drop('Initial')

<b> Uncertainity about initials. So,
<b> Imputing missing values using average age of male & female

In [12]:
grpby_sex = train.groupBy('Sex')
avg_age_vals = grpby_sex.avg('Age').collect()
avg_age_vals
mean_female_age = round(avg_age_vals[0][1], 0)
mean_male_age = round(avg_age_vals[1][1], 0)
print("Avg. female age: {favg} & Avg. male age: {mavg}".format(favg = mean_female_age, mavg = mean_male_age))

Avg. female age: 28.0 & Avg. male age: 31.0


In [13]:
train = train.withColumn('Age', when(condition = (train['Age'].isNull()) & (train['Sex'] == 'male'),
                                     value = mean_male_age).otherwise(train['Age']))
train = train.withColumn('Age', when(condition = (train['Age'].isNull()) & (train['Sex'] == 'female'),
                                     value = mean_female_age).otherwise(train['Age']))

In [14]:
for name in train.schema.names:
    na_count = train.filter(train[name].isNull()).count()
    na_perc = round(100 * na_count / train.count(), 2)
    print(name + ": " + str(na_count) + " " + str(na_perc))

PassengerId: 0 0.0
Survived: 0 0.0
Pclass: 0 0.0
Name: 0 0.0
Sex: 0 0.0
Age: 0 0.0
SibSp: 0 0.0
Parch: 0 0.0
Ticket: 0 0.0
Fare: 0 0.0
Embarked: 0 0.0


<h2> Exploratory Data Analysis </h2>

In [15]:
grpby_pclass = train.groupBy('Pclass')
grpby_pclass.count().show()

+------+-----+
|Pclass|count|
+------+-----+
|     1|  214|
|     3|  491|
|     2|  184|
+------+-----+



In [16]:
grpby_sex = train.groupBy('Sex')
grpby_sex.count().show()

+------+-----+
|   Sex|count|
+------+-----+
|female|  312|
|  male|  577|
+------+-----+



In [17]:
grpby_sex.avg('Age').show()

+------+------------------+
|   Sex|          avg(Age)|
+------+------------------+
|female| 27.78846153846154|
|  male|30.785389948006937|
+------+------------------+



<b> Observations: </b><br>
    <i> Average age of passeners in First class in higher than passengers in second & third class </i><br><br>
<b> Insights: </b>   
    <i> There might be a relationship between worth of a passenger and its age.<br>
    We may say that it took more time to accumulate wealth or to complete education in those days.

In [18]:
grpby_pclass.avg('Age').show()

+------+------------------+
|Pclass|          avg(Age)|
+------+------------------+
|     1|36.983271028037386|
|     3|26.506965376782077|
|     2| 29.91211956521739|
+------+------------------+



In [19]:
grpby_pclass.avg('Fare').show()

+------+------------------+
|Pclass|         avg(Fare)|
+------+------------------+
|     1| 84.19351635514012|
|     3|13.675550101832997|
|     2| 20.66218315217391|
+------+------------------+



In [20]:
grpby_embarked = train.groupBy('Embarked')
grpby_embarked.count().show()
# Q: Queenstown
# C: Cherbourg
# S: Southampton

+--------+-----+
|Embarked|count|
+--------+-----+
|       Q|   77|
|       C|  168|
|       S|  644|
+--------+-----+



In [21]:
grpby_sex.sum('Parch').show()

+------+----------+
|   Sex|sum(Parch)|
+------+----------+
|female|       204|
|  male|       136|
+------+----------+



<b> Passengers in third class were travelling with families. Since higher number of parent/child and siblings/spouse on board

In [22]:
grpby_pclass.sum('Parch').show()

+------+----------+
|Pclass|sum(Parch)|
+------+----------+
|     1|        77|
|     3|       193|
|     2|        70|
+------+----------+



In [23]:
grpby_pclass.sum('SibSp').show()

+------+----------+
|Pclass|sum(SibSp)|
+------+----------+
|     1|        90|
|     3|       302|
|     2|        74|
+------+----------+



<b>Q: Queenstown C: Cherbourg S: Southampton <br>
Observations: <br> </b>
<i>Port C: First Class & Second Class <br>
Port Q: Predominantly in third class <br>
Port S: All of the classes but majority in third class </i><br><br>
<b>Insights: <br></b>
<i>Port Q: Rural area or under developed region <br>
Port S: City or developed region <br>
Port c: City or developed region

In [24]:
grpby_embarked_pclass = train.groupBy(['Embarked', 'Pclass'])
grpby_embarked_pclass.count().sort(['Embarked', 'Pclass']).show()

+--------+------+-----+
|Embarked|Pclass|count|
+--------+------+-----+
|       C|     1|   85|
|       C|     2|   17|
|       C|     3|   66|
|       Q|     1|    2|
|       Q|     2|    3|
|       Q|     3|   72|
|       S|     1|  127|
|       S|     2|  164|
|       S|     3|  353|
+--------+------+-----+



In [25]:
grpby_embarked_sex = train.groupBy(['Embarked', 'Sex'])
grpby_embarked_sex.count().sort(['Embarked', 'Sex']).show()

+--------+------+-----+
|Embarked|   Sex|count|
+--------+------+-----+
|       C|female|   73|
|       C|  male|   95|
|       Q|female|   36|
|       Q|  male|   41|
|       S|female|  203|
|       S|  male|  441|
+--------+------+-----+



In [26]:
train.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05|       S|
|          6|       0|     3|    Moran, Mr. James|  male|31.0|    0|    0|          330877| 8.4583|       Q|
|          7|      

<h3> Preparing dataset for model learning </h3><br>
<i> 
    * Feature Engineering - Combine parent-child and Siblings-spouse columns <br>
    * Feature Engineering - Indexers to convert binary categorical columns to numeric indexes <br>
    * (Train, Test) = (70, 30)
</i><br>

In [27]:
print(train.columns)

['PassengerId', 'Survived', 'Pclass', 'Name', 'Sex', 'Age', 'SibSp', 'Parch', 'Ticket', 'Fare', 'Embarked']


In [28]:
train = train.withColumn('Family_Size', col('Parch') + col('SibSp'))
test = test.withColumn('Family_Size', col('Parch') + col('SibSp'))

<b>
    Convert family size to bianry categorical variable, <br>
</b> <br>    
<i>
    * 0 for passenger travelling alone <br>
    * 1 for passenger travelling with family
</i>

In [29]:
train = train.withColumn('Family_Flag', when(condition = (train['Family_Size'] == 0), value = (0)).otherwise(1))
test = test.withColumn('Family_Flag', when(condition = (test['Family_Size'] == 0), value = (0)).otherwise(1))

<b> Indexers for categorical columns </b>

In [30]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

In [31]:
indexer = [StringIndexer(inputCol = cat_feature, outputCol = cat_feature + '_dummy').fit(train) for cat_feature in ['Sex', 'Embarked']]
indexer_test = [StringIndexer(inputCol = cat_feature, outputCol = cat_feature + '_dummy').fit(test) for cat_feature in ['Sex', 'Embarked']]

In [32]:
train = Pipeline(stages = indexer).fit(train).transform(train)
test = Pipeline(stages = indexer_test).fit(test).transform(test)

In [33]:
train.select(['Embarked', 'Embarked_dummy', 'Sex_dummy', 'Sex']).show()

+--------+--------------+---------+------+
|Embarked|Embarked_dummy|Sex_dummy|   Sex|
+--------+--------------+---------+------+
|       S|           0.0|      0.0|  male|
|       C|           1.0|      1.0|female|
|       S|           0.0|      1.0|female|
|       S|           0.0|      1.0|female|
|       S|           0.0|      0.0|  male|
|       Q|           2.0|      0.0|  male|
|       S|           0.0|      0.0|  male|
|       S|           0.0|      0.0|  male|
|       S|           0.0|      1.0|female|
|       C|           1.0|      1.0|female|
|       S|           0.0|      1.0|female|
|       S|           0.0|      1.0|female|
|       S|           0.0|      0.0|  male|
|       S|           0.0|      0.0|  male|
|       S|           0.0|      1.0|female|
|       S|           0.0|      1.0|female|
|       Q|           2.0|      0.0|  male|
|       S|           0.0|      0.0|  male|
|       S|           0.0|      1.0|female|
|       C|           1.0|      1.0|female|
+--------+-

<b> Indexer Output </b> <br>
<b> Indexer assign 0.0 to most frequent label in the feature </b> <br>
<i>
    * for sex = {male - 577, female - 312}, male = 0.0, female = 1.0
    * for embarked = {S - 644, Q - 77, C - 168}, S = 0.0, C = 1.0, Q = 2.0
</i>    

In [34]:
print('Features = {}'.format(train.columns))

Features = ['PassengerId', 'Survived', 'Pclass', 'Name', 'Sex', 'Age', 'SibSp', 'Parch', 'Ticket', 'Fare', 'Embarked', 'Family_Size', 'Family_Flag', 'Sex_dummy', 'Embarked_dummy']


In [35]:
for name in train.schema.names:
    na_count = train.filter(train[name].isNull()).count()
    na_perc = round(100 * na_count / train.count(), 2)
    print(name + ": " + str(na_count) + " " + str(na_perc))

PassengerId: 0 0.0
Survived: 0 0.0
Pclass: 0 0.0
Name: 0 0.0
Sex: 0 0.0
Age: 0 0.0
SibSp: 0 0.0
Parch: 0 0.0
Ticket: 0 0.0
Fare: 0 0.0
Embarked: 0 0.0
Family_Size: 0 0.0
Family_Flag: 0 0.0
Sex_dummy: 0 0.0
Embarked_dummy: 0 0.0


<b> Drop labels </b> <br>
<i>
    * PassengerId
    * Name
    * Sex
    * SibSp
    * Parch
    * Ticket
    * Cabin
    * Embarked
    * Family_Size
</i>    

In [36]:
features = ['Survived', 'PClass', 'Age', 'Fare', 'Embarked_dummy', 'Sex_dummy', 'Family_Flag']
new_train = train.select(features)
new_test = test.select(features[1:])

In [37]:
new_train.printSchema()
new_train.show()

for name in new_train.schema.names:
    na_count = new_train.filter(train[name].isNull()).count()
    na_perc = round(100 * na_count / new_train.count(), 2)
    print(name + ": " + str(na_count) + " " + str(na_perc))

root
 |-- Survived: integer (nullable = true)
 |-- PClass: integer (nullable = true)
 |-- Age: double (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked_dummy: double (nullable = false)
 |-- Sex_dummy: double (nullable = false)
 |-- Family_Flag: integer (nullable = false)

+--------+------+----+-------+--------------+---------+-----------+
|Survived|PClass| Age|   Fare|Embarked_dummy|Sex_dummy|Family_Flag|
+--------+------+----+-------+--------------+---------+-----------+
|       0|     3|22.0|   7.25|           0.0|      0.0|          1|
|       1|     1|38.0|71.2833|           1.0|      1.0|          1|
|       1|     3|26.0|  7.925|           0.0|      1.0|          0|
|       1|     1|35.0|   53.1|           0.0|      1.0|          1|
|       0|     3|35.0|   8.05|           0.0|      0.0|          0|
|       0|     3|31.0| 8.4583|           2.0|      0.0|          0|
|       0|     1|54.0|51.8625|           0.0|      0.0|          0|
|       0|     3| 2.0| 21.075

<b> Train test split, 70:30

In [38]:
from pyspark.ml.feature import VectorAssembler

In [39]:
vector_assembler = VectorAssembler(inputCols = new_train.columns[1:], outputCol = 'feat_vector')
dataset = vector_assembler.transform(new_train)

In [40]:
dataset.printSchema()
dataset.show()

root
 |-- Survived: integer (nullable = true)
 |-- PClass: integer (nullable = true)
 |-- Age: double (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked_dummy: double (nullable = false)
 |-- Sex_dummy: double (nullable = false)
 |-- Family_Flag: integer (nullable = false)
 |-- feat_vector: vector (nullable = true)

+--------+------+----+-------+--------------+---------+-----------+--------------------+
|Survived|PClass| Age|   Fare|Embarked_dummy|Sex_dummy|Family_Flag|         feat_vector|
+--------+------+----+-------+--------------+---------+-----------+--------------------+
|       0|     3|22.0|   7.25|           0.0|      0.0|          1|[3.0,22.0,7.25,0....|
|       1|     1|38.0|71.2833|           1.0|      1.0|          1|[1.0,38.0,71.2833...|
|       1|     3|26.0|  7.925|           0.0|      1.0|          0|[3.0,26.0,7.925,0...|
|       1|     1|35.0|   53.1|           0.0|      1.0|          1|[1.0,35.0,53.1,0....|
|       0|     3|35.0|   8.05|           0.

In [41]:
final_dataset = dataset.select('feat_vector', 'Survived')
final_dataset.printSchema()

root
 |-- feat_vector: vector (nullable = true)
 |-- Survived: integer (nullable = true)



In [42]:
(trainSet, testSet) = final_dataset.randomSplit(weights = [0.7, 0.3], seed = 101)

In [43]:
trainSet.printSchema()
testSet.printSchema()

root
 |-- feat_vector: vector (nullable = true)
 |-- Survived: integer (nullable = true)

root
 |-- feat_vector: vector (nullable = true)
 |-- Survived: integer (nullable = true)



<h2> Data Models</h2> <br>
<i><b> Process: </b>
    <ol>
        <li>Implement classification model</li>
        <li>Evaluate performnce on test set</li>
        <li>Repeat 1 & 2 for all of the classifiers</li>
        <li>Select best performing classifier</li>
    </ol>
    
<i> Classifiers from <b>pyspark.ml.classification</b></i>
* Logistics Classifier
* Decision Tree
* Random Forest
* Gradient Boost Tree
* Naive Bayes
* Support Vector Machine

<h3> Logistic Regression </h3>

In [44]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [45]:
log_reg_classifier = LogisticRegression(featuresCol = 'feat_vector', labelCol = 'Survived')

In [46]:
log_reg_fit = log_reg_classifier.fit(trainSet)

In [85]:
# Predictions
predictions = log_reg_fit.evaluate(testSet)

In [48]:
log_reg_predictions.printSchema()
predictions.predictions.printSchema()

root
 |-- feat_vector: vector (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

root
 |-- feat_vector: vector (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [49]:
log_reg_predictions.select(['prediction', 'Survived', 'feat_vector']).show(5)

+----------+--------+--------------------+
|prediction|Survived|         feat_vector|
+----------+--------+--------------------+
|       0.0|       0|(6,[0,1],[2.0,31.0])|
|       0.0|       0|(6,[0,1],[2.0,31.0])|
|       0.0|       0|(6,[0,1],[2.0,31.0])|
|       0.0|       1|(6,[0,1],[3.0,25.0])|
|       1.0|       0|[1.0,2.0,151.55,0...|
+----------+--------+--------------------+
only showing top 5 rows



In [83]:
bin_evaluator = BinaryClassificationEvaluator(rawPredictionCol = 'rawPrediction', labelCol = 'Survived')

In [84]:
bin_eval = bin_evaluator.evaluate(predictions.predictions)
print(bin_eval)

0.8232952578746979


<h3> Decision Tree </h3>

In [52]:
from pyspark.ml.classification import DecisionTreeClassifier

In [53]:
dec_tree = DecisionTreeClassifier(labelCol="Survived", featuresCol="feat_vector")

In [86]:
dec_tree_fit = dec_tree.fit(trainSet)

In [55]:
dec_tree_pred = dec_tree_fit.transform(testSet)

In [56]:
dec_tree_accuracy = evaluator.evaluate(dec_tree_pred)
print(dec_tree_accuracy)

0.79182156133829


<h3> Random Forest </h3>

In [57]:
from pyspark.ml.classification import RandomForestClassifier

In [58]:
rf = RandomForestClassifier(featuresCol = 'feat_vector', labelCol = 'Survived')

In [59]:
rf_fit = rf.fit(trainSet)

In [60]:
rf_pred = rf_fit.transform(testSet)

In [61]:
rf_accuracy = evaluator.evaluate(rf_pred)
print(rf_accuracy)

0.79182156133829


<h3> Gradient Boost Tree </h3>

In [62]:
from pyspark.ml.classification import GBTClassifier

In [63]:
gbt_classifier = GBTClassifier(featuresCol = 'feat_vector', labelCol = 'Survived')

In [64]:
gbt_classifier_fit = gbt_classifier.fit(trainSet)

In [65]:
gbt_classifier_pred = gbt_classifier_fit.transform(testSet)

In [66]:
gbt_classifier_accuracy = evaluator.evaluate(gbt_classifier_pred)
print(gbt_classifier_accuracy)

0.7806691449814126


<h3> Naive Bayes</h3>

In [67]:
from pyspark.ml.classification import NaiveBayes

In [68]:
nb_classifier = NaiveBayes(featuresCol = 'feat_vector', labelCol = 'Survived')

In [69]:
nb_classifier_fit = nb_classifier.fit(trainSet)

In [70]:
nb_classifier_pred = nb_classifier_fit.transform(testSet)

In [71]:
nb_classifier_accuracy = evaluator.evaluate(nb_classifier_pred)
print(nb_classifier_accuracy)

0.6691449814126395


<h3> Support Vector Machine </h3>

In [72]:
from pyspark.ml.classification import LinearSVC

In [73]:
svc_classifier = LinearSVC(featuresCol = 'feat_vector', labelCol = 'Survived')

In [87]:
svc_classifier_fit = svc_classifier.fit(trainSet)

In [75]:
svc_classifier_pred = svc_classifier_fit.transform(testSet)

In [76]:
svc_classifier_accuracy = evaluator.evaluate(svc_classifier_pred)
print(svc_classifier_accuracy)

0.7769516728624535


<b> Log Reg is performing best </b> <br>
<i> Feed test data</i>

In [77]:
test.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Family_Size: integer (nullable = true)
 |-- Family_Flag: integer (nullable = false)
 |-- Sex_dummy: double (nullable = false)
 |-- Embarked_dummy: double (nullable = false)



In [78]:
assembler = VectorAssembler(inputCols = new_test.columns, outputCol = 'feat_vector')
new_test = assembler.transform(new_test)

In [79]:
new_test.printSchema()

root
 |-- PClass: integer (nullable = true)
 |-- Age: double (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked_dummy: double (nullable = false)
 |-- Sex_dummy: double (nullable = false)
 |-- Family_Flag: integer (nullable = false)
 |-- feat_vector: vector (nullable = true)



In [80]:
results = log_reg_fit.transform(new_test)

In [107]:
results.printSchema()

root
 |-- PClass: integer (nullable = true)
 |-- Age: double (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked_dummy: double (nullable = false)
 |-- Sex_dummy: double (nullable = false)
 |-- Family_Flag: integer (nullable = false)
 |-- feat_vector: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)
 |-- id: long (nullable = false)



In [82]:
results = results.withColumn('id', monotonically_increasing_id())
test = test.withColumn('id', monotonically_increasing_id())

In [88]:
final_results = test.select('id', 'Name').join(other = results.select('id', 'prediction'), on = 'id', how = 'inner')

In [109]:
final_results.printSchema()

root
 |-- id: long (nullable = false)
 |-- Name: string (nullable = true)
 |-- prediction: double (nullable = false)

