In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("titanic").getOrCreate()

24/01/18 23:04:47 WARN Utils: Your hostname, Savvass-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.193.99 instead (on interface en0)
24/01/18 23:04:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/18 23:05:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Data Preprocessing and Feature Engineering

In [3]:
data = spark.read.csv("titanic.csv", header=True, inferSchema=True)

In [4]:
data.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|
|          6|       0|     3|    Moran, Mr. James|  male|NULL|    0|    0|      

In [5]:
data.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [6]:
data.count()

891

In [7]:
print("COUNTS")
print("------")
for item, col in zip(data.describe().head(1)[0][1:], data.columns):
    print(f"{col} : {item}")

COUNTS
------


24/01/18 23:05:28 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 6:>                                                          (0 + 1) / 1]

PassengerId : 891
Survived : 891
Pclass : 891
Name : 891
Sex : 891
Age : 714
SibSp : 891
Parch : 891
Ticket : 891
Fare : 891
Cabin : 204
Embarked : 889


[Stage 8:>                                                          (0 + 1) / 1]                                                                                

In [8]:
# Select columns for model

data_cols = data.select([
    "Survived",
    "Pclass",
    "Sex",
    "Age",
    "SibSp",
    "Parch",
    "Fare",
    "Embarked"
])

In [9]:
# The final dataset: drop all nan entries
final_dataset = data_cols.na.drop()

In [10]:
final_dataset.describe().show()

+-------+------------------+------------------+------+-----------------+------------------+-------------------+------------------+--------+
|summary|          Survived|            Pclass|   Sex|              Age|             SibSp|              Parch|              Fare|Embarked|
+-------+------------------+------------------+------+-----------------+------------------+-------------------+------------------+--------+
|  count|               712|               712|   712|              712|               712|                712|               712|     712|
|   mean|0.4044943820224719| 2.240168539325843|  NULL|29.64209269662921|0.5140449438202247|0.43258426966292135| 34.56725140449432|    NULL|
| stddev|0.4911389472541192|0.8368543166903446|  NULL|14.49293290032352|0.9306921267673427| 0.8541814457454133|52.938648174710906|    NULL|
|    min|                 0|                 1|female|             0.42|                 0|                  0|               0.0|       C|
|    max|           

### One hot encode categorical data

In [11]:
from pyspark.ml.feature import (VectorAssembler, VectorIndexer,
                                OneHotEncoder, StringIndexer)



In [12]:
# Assign a number for each category in the Sex column
sex_indexer = StringIndexer(inputCol='Sex', outputCol='SexIndex')

# One hot encoding: convert the numerical indexes into a vector
sex_encoder = OneHotEncoder(inputCol='SexIndex', outputCol='SexVec')

In [13]:
embarked_indexer = StringIndexer(inputCol='Embarked', outputCol='EmbarkedIndex')
embarked_encoder = OneHotEncoder(inputCol='EmbarkedIndex', outputCol='EmbarkedVec')

In [14]:
# Assemble all of the above
assembler = VectorAssembler(
    inputCols=["Pclass", "SexVec", "Age", "SibSp", "Parch", "Fare", "EmbarkedVec"],
    outputCol="features",
)

## Create a Logistic Regression Model and a Pipeline

In [15]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

In [16]:
logreg = LogisticRegression(featuresCol='features', labelCol='Survived')

In [17]:
pipeline = Pipeline(
    stages=[
        sex_indexer,
        embarked_indexer,
        sex_encoder,
        embarked_encoder,
        assembler,
        logreg,
    ]
)

In [18]:
train_data, test_data = final_dataset.randomSplit([0.7, 0.3])

In [19]:
fitted_model = pipeline.fit(train_data)

24/01/18 23:05:35 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/01/18 23:05:35 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


## Performance Evaluation

In [20]:
results = fitted_model.transform(test_data)

In [21]:
results.show(5)

+--------+------+------+----+-----+-----+--------+--------+--------+-------------+-------------+-------------+--------------------+--------------------+--------------------+----------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|    Fare|Embarked|SexIndex|EmbarkedIndex|       SexVec|  EmbarkedVec|            features|       rawPrediction|         probability|prediction|
+--------+------+------+----+-----+-----+--------+--------+--------+-------------+-------------+-------------+--------------------+--------------------+--------------------+----------+
|       0|     1|female|50.0|    0|    0| 28.7125|       C|     1.0|          1.0|    (1,[],[])|(2,[1],[1.0])|(8,[0,2,5,7],[1.0...|[-2.6767973731730...|[0.06435645277012...|       1.0|
|       0|     1|  male|36.0|    0|    0|  40.125|       C|     0.0|          1.0|(1,[0],[1.0])|(2,[1],[1.0])|[1.0,1.0,36.0,0.0...|[-0.6498080763498...|[0.34303278818562...|       1.0|
|       0|     1|  male|36.0|    1|    0|   78.85|       S|     0.0|       

In [22]:
results.select('Survived', 'prediction').show()

+--------+----------+
|Survived|prediction|
+--------+----------+
|       0|       1.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
+--------+----------+
only showing top 20 rows



In [23]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [24]:
model_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='Survived')

In [25]:
# Area under curve
model_eval.evaluate(results)

0.8225806451612903