In [0]:
df = sqlContext.sql('SELECT * FROM titanic_csv')

In [0]:
%sql
SELECT SUM(CASE WHEN cabin is NULL THEN 1 ELSE 0 END) as cabin_nulls
FROM titanic_csv

cabin_nulls
687


In [0]:
df.columns

Out[6]: ['_c0',
 'PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [0]:
num_null_dict = {}

for column in df.columns:
    num_null_dict[column] = sqlContext.sql(f'SELECT SUM(CASE WHEN {column} IS NULL THEN 1 ELSE 0 END) AS num_null_{column} FROM titanic_csv')

In [0]:
for _,df in num_null_dict.items():
    df.show()

+------------+
|num_null__c0|
+------------+
|           0|
+------------+

+--------------------+
|num_null_PassengerId|
+--------------------+
|                   0|
+--------------------+

+-----------------+
|num_null_Survived|
+-----------------+
|                0|
+-----------------+

+---------------+
|num_null_Pclass|
+---------------+
|              0|
+---------------+

+-------------+
|num_null_Name|
+-------------+
|            0|
+-------------+

+------------+
|num_null_Sex|
+------------+
|           0|
+------------+

+------------+
|num_null_Age|
+------------+
|         177|
+------------+

+--------------+
|num_null_SibSp|
+--------------+
|             0|
+--------------+

+--------------+
|num_null_Parch|
+--------------+
|             0|
+--------------+

+---------------+
|num_null_Ticket|
+---------------+
|              0|
+---------------+

+-------------+
|num_null_Fare|
+-------------+
|            0|
+-------------+

+--------------+
|num_null_Cabin|
+----

In [0]:
df.printSchema()

root
 |-- num_null_Embarked: long (nullable = true)



In [0]:
df.columns

Out[20]: ['_c0',
 'PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [0]:
my_cols = df.select(['Survived','Pclass','Age','Sibsp','Parch','Fare','Sex','Embarked'])

In [0]:
my_final_data = my_cols.na.drop()
my_final_data.show(3)

+--------+------+----+-----+-----+-------+------+--------+
|Survived|Pclass| Age|Sibsp|Parch|   Fare|   Sex|Embarked|
+--------+------+----+-----+-----+-------+------+--------+
|       0|     3|22.0|    1|    0|   7.25|  male|       S|
|       1|     1|38.0|    1|    0|71.2833|female|       C|
|       1|     3|26.0|    0|    0|  7.925|female|       S|
+--------+------+----+-----+-----+-------+------+--------+
only showing top 3 rows



In [0]:
from pyspark.ml.feature import (VectorAssembler,
                               VectorIndexer,
                               OneHotEncoder,
                               StringIndexer)

In [0]:
gender_indexer = StringIndexer(inputCol = 'Sex',
                              outputCol = 'Sex_index')
gender_encoder = OneHotEncoder(inputCol = 'Sex_index',
                               outputCol = 'Sex_OHE')

In [0]:
embarked_indexer = StringIndexer(inputCol = 'Embarked',
                              outputCol = 'Embarked_index')
embarked_encoder = OneHotEncoder(inputCol = 'Embarked_index',
                                 outputCol = 'Embarked_OHE')

In [0]:
my_final_data.columns

Out[81]: ['Survived', 'Pclass', 'Age', 'Sibsp', 'Parch', 'Fare', 'Sex', 'Embarked']

In [0]:
assembler = VectorAssembler(inputCols =['Pclass','Age','Sibsp','Parch','Fare','Sex_OHE','Embarked_OHE'],
                            outputCol = 'features')

In [0]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

In [0]:
log_reg_titanic = LogisticRegression(featuresCol ='features',
                                     labelCol ='Survived')

In [0]:
pipeline = Pipeline(stages = [gender_indexer,
                             embarked_indexer,
                             gender_encoder,
                             embarked_encoder,
                             assembler,
                             log_reg_titanic])

In [0]:
train_data , test_data = my_final_data.randomSplit([0.7,0.3])

In [0]:
fit_model = pipeline.fit(train_data)

In [0]:
train_results = fit_model.transform(train_data)
test_results = fit_model.transform(test_data)

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [0]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                          labelCol='Survived')

In [0]:
train_results.select('Survived','prediction').show(5)

+--------+----------+
|Survived|prediction|
+--------+----------+
|       0|       1.0|
|       0|       0.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
+--------+----------+
only showing top 5 rows



In [0]:
train_auc = evaluator.evaluate(train_results)
test_auc = evaluator.evaluate(test_results)

In [0]:
train_auc , test_auc

Out[110]: (0.7901616569840868, 0.7860008445945945)

In [0]:
test_eval.