# Developing a Classification Model for Survival in Spark

I used the titanic data to practice working with various classes and methods in pyspark.ml.

In [1]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('titanic').getOrCreate()

### Loading and checking the data

In [4]:
data = spark.read.csv('titanic.csv',inferSchema=True,header=True)

In [5]:
data.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [6]:
data.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

### Separating relevant data columns

In [7]:
final_data = data.select(['Survived',
 'Pclass',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Fare',
 'Embarked'])

### Dropping Null Values

The Embarked column was the only column with null values.

In [8]:
final_data.where(final_data.Embarked.isNull()).count()

2

In [9]:
final_data = final_data.na.drop()

### Setting up the Modeling Pipeline

#### Preprocessing the Categorical Columns by String Indexing and One Hot Encoding

In [10]:
from pyspark.ml.feature import VectorAssembler,VectorIndexer,OneHotEncoder,StringIndexer

In [11]:
gender_indexer = StringIndexer(inputCol='Sex',outputCol='GenderIndex')
gender_encoder = OneHotEncoder(inputCol='GenderIndex',outputCol='GenderVec')

In [12]:
embark_indexer = StringIndexer(inputCol='Embarked',outputCol='EmbarkIndex')
embark_encoder = OneHotEncoder(inputCol='EmbarkIndex',outputCol='EmbarkVec')

In [13]:
assembler = VectorAssembler(inputCols=['Pclass',
 'GenderVec',
 'Age',
 'SibSp',
 'Parch',
 'Fare',
 'EmbarkVec'],outputCol='features')

#### Initializing a Logisitic Regression Model

In [14]:
from pyspark.ml.classification import LogisticRegression

In [15]:
logreg_titanic = LogisticRegression(featuresCol='features',labelCol='Survived')

#### Setting up a Preprocessing and Modeling Pipeline

In [16]:
from pyspark.ml import Pipeline

In [17]:
pipeline = Pipeline(stages=[gender_indexer,embark_indexer,
                           gender_encoder,embark_encoder,
                           assembler,logreg_titanic])

#### Splitting the Data into Train and Test Sets

In [18]:
train_data, test_data = final_data.randomSplit([0.7,.3])

#### Fitting the Model to the Train Data

In [19]:
fit_model = pipeline.fit(train_data)

#### Running the Model on the Test Data

In [20]:
results = fit_model.transform(test_data)

#### Evaluating the Predictions using the BinaryClassificationEvaluator

In [21]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [22]:
test_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='Survived')

In [23]:
results.select('Survived','prediction').show()

+--------+----------+
|Survived|prediction|
+--------+----------+
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       0.0|
|       0|       1.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
+--------+----------+
only showing top 20 rows



In [24]:
AUC = test_eval.evaluate(results)

In [25]:
AUC

0.7916745416745417

The metric used for classification is the areaUnderROC (curve). The value can range from 0 to 1, with a value closer to 1 being desirable. A value of about 0.8 indicates that the model performs well.