# Titanic Project

### Initializing Spark

In [1]:
import findspark
findspark.init('/home/ubuntu/spark-3.1.1-bin-hadoop3.2')

In [2]:
from pyspark.sql import SparkSession

In [3]:
#Initializing Spark Session
spark = SparkSession.builder.appName('lr').getOrCreate()

### Importing data

In [4]:
data = spark.read.csv('titanic.csv',inferSchema=True,header=True)

In [5]:
data.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

## Exploratory Data Analysis

In [6]:
data.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [7]:
data.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

### Missing Values

In [8]:
from pyspark.sql.functions import isnan, when, count, col

In [9]:
data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|177|    0|    0|     0|   0|  687|       2|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



### Shape of the dataset

In [10]:
print((data.count(),len(data.columns)))

(891, 12)


## Data Preprocessing

In [11]:
#Selecting columns on which to work on
my_cols = data.select(['Survived','Pclass','Sex','Age','SibSp','Parch','Fare','Embarked'])

In [12]:
my_cols.show(5)

+--------+------+------+----+-----+-----+-------+--------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|
+--------+------+------+----+-----+-----+-------+--------+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|
|       1|     1|female|38.0|    1|    0|71.2833|       C|
|       1|     3|female|26.0|    0|    0|  7.925|       S|
|       1|     1|female|35.0|    1|    0|   53.1|       S|
|       0|     3|  male|35.0|    0|    0|   8.05|       S|
+--------+------+------+----+-----+-----+-------+--------+
only showing top 5 rows



### Dropping records with missing values

In [13]:
my_final_data = my_cols.na.drop()

### Handling Categorical Values

#### 1.Sex Column

In [14]:
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,
                               OneHotEncoder,StringIndexer)

In [15]:
#In StringIndexer we provide the inputCol i.e. Categorical Column and in output column we provide the name of our own
#Normally ColumnName concatenated with 'Index'
gender_indexer = StringIndexer(inputCol='Sex',outputCol='SexIndex')

In [16]:
#After StringIndexer, we use OneHotEncoding
gender_encoder = OneHotEncoder(inputCol='SexIndex',outputCol='SexVec')

#### 2.Embarked Column

In [17]:
embarked_indexer = StringIndexer(inputCol='Embarked',outputCol='EmbarkedIndex')
embarked_encoder = OneHotEncoder(inputCol='EmbarkedIndex',outputCol='EmbarkedVec')

#### Applying Transformation on the data

In [32]:
transformed_data = gender_indexer.fit(my_final_data).transform(my_final_data)
transformed_data = gender_encoder.fit(transformed_data).transform(transformed_data)
transformed_data = embarked_indexer.fit(transformed_data).transform(transformed_data)
transformed_data = embarked_encoder.fit(transformed_data).transform(transformed_data)

In [33]:
transformed_data.show(5)

+--------+------+------+----+-----+-----+-------+--------+--------+-------------+-------------+-------------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|SexIndex|       SexVec|EmbarkedIndex|  EmbarkedVec|
+--------+------+------+----+-----+-----+-------+--------+--------+-------------+-------------+-------------+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|     0.0|(1,[0],[1.0])|          0.0|(2,[0],[1.0])|
|       1|     1|female|38.0|    1|    0|71.2833|       C|     1.0|    (1,[],[])|          1.0|(2,[1],[1.0])|
|       1|     3|female|26.0|    0|    0|  7.925|       S|     1.0|    (1,[],[])|          0.0|(2,[0],[1.0])|
|       1|     1|female|35.0|    1|    0|   53.1|       S|     1.0|    (1,[],[])|          0.0|(2,[0],[1.0])|
|       0|     3|  male|35.0|    0|    0|   8.05|       S|     0.0|(1,[0],[1.0])|          0.0|(2,[0],[1.0])|
+--------+------+------+----+-----+-----+-------+--------+--------+-------------+-------------+-------------+
only showi

In [34]:
transformed_data.columns

['Survived',
 'Pclass',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Fare',
 'Embarked',
 'SexIndex',
 'SexVec',
 'EmbarkedIndex',
 'EmbarkedVec']

#### Transforming data for Spark

In [35]:
#Helps in creation of 2 columns
assembler = VectorAssembler(inputCols=['Pclass','Age','SibSp','Parch','Fare','SexVec','EmbarkedVec'],
                           outputCol='features')

In [36]:
output = assembler.transform(transformed_data)

In [38]:
output.show(2)

+--------+------+------+----+-----+-----+-------+--------+--------+-------------+-------------+-------------+--------------------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|SexIndex|       SexVec|EmbarkedIndex|  EmbarkedVec|            features|
+--------+------+------+----+-----+-----+-------+--------+--------+-------------+-------------+-------------+--------------------+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|     0.0|(1,[0],[1.0])|          0.0|(2,[0],[1.0])|[3.0,22.0,1.0,0.0...|
|       1|     1|female|38.0|    1|    0|71.2833|       C|     1.0|    (1,[],[])|          1.0|(2,[1],[1.0])|[1.0,38.0,1.0,0.0...|
+--------+------+------+----+-----+-----+-------+--------+--------+-------------+-------------+-------------+--------------------+
only showing top 2 rows



In [40]:
output.select(['features','Survived']).show(5)

+--------------------+--------+
|            features|Survived|
+--------------------+--------+
|[3.0,22.0,1.0,0.0...|       0|
|[1.0,38.0,1.0,0.0...|       1|
|(8,[0,1,4,6],[3.0...|       1|
|[1.0,35.0,1.0,0.0...|       1|
|[3.0,35.0,0.0,0.0...|       0|
+--------------------+--------+
only showing top 5 rows



In [42]:
final_data = output.select(['features','Survived'])

In [43]:
final_data.show(5)

+--------------------+--------+
|            features|Survived|
+--------------------+--------+
|[3.0,22.0,1.0,0.0...|       0|
|[1.0,38.0,1.0,0.0...|       1|
|(8,[0,1,4,6],[3.0...|       1|
|[1.0,35.0,1.0,0.0...|       1|
|[3.0,35.0,0.0,0.0...|       0|
+--------------------+--------+
only showing top 5 rows



## Model Building

#### Train Test Split

In [44]:
train_data , test_data = final_data.randomSplit([0.7,0.3],seed=101)

#### Logistic Regression Model`

In [45]:
from pyspark.ml.classification import LogisticRegression

In [46]:
log_reg = LogisticRegression(labelCol='Survived')

In [48]:
log_reg_model = log_reg.fit(train_data)

### Alternative Method (use Pipelines)

In [None]:
# from pyspark.ml import Pipeline

# log_reg_titanic = LogisticRegression(featuresCol='features',labelCol='Survived')

# pipeline = Pipeline(stages=[gender_indexer,embark_indexer,
#                            gender_encoder,embark_encoder,
#                            assembler,log_reg_titanic])

# train_titanic_data, test_titanic_data = my_final_data.randomSplit([0.7,.3])

# fit_model = pipeline.fit(train_titanic_data)

# results = fit_model.transform(test_titanic_data)

### Evaluation Metrics

In [53]:
#Binary Classification Evaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [54]:
#Evaluating test_data
test_results = log_reg_model.evaluate(test_data)

In [56]:
test_results.predictions.show(5)

+--------------------+--------+--------------------+--------------------+----------+
|            features|Survived|       rawPrediction|         probability|prediction|
+--------------------+--------+--------------------+--------------------+----------+
|(8,[0,1,4],[3.0,1...|       1|[-0.0739543882715...|[0.48151982489163...|       1.0|
|(8,[0,1,4],[3.0,1...|       1|[-0.0739898447385...|[0.48151097288967...|       1.0|
|(8,[0,1,4,5],[3.0...|       0|[3.02489521683134...|[0.95368622222345...|       0.0|
|(8,[0,1,4,5],[3.0...|       1|[3.18076142297133...|[0.96010384215865...|       0.0|
|(8,[0,1,4,5],[3.0...|       0|[3.62892742909344...|[0.97414175758540...|       0.0|
+--------------------+--------+--------------------+--------------------+----------+
only showing top 5 rows



In [57]:
#Creating instance of BinaryClassificationEvaluator
survived_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                             labelCol='Survived')

In [60]:
auc = survived_eval.evaluate(test_results.predictions)
auc

0.787080459770115

In [52]:
test_results.accuracy

0.8066037735849056

In [61]:
test_results.areaUnderROC

0.8596781609195401

#### Categorising Actual Label and Predicted Label

In [64]:
y_true = test_results.predictions.select(['Survived']).collect()
y_pred = test_results.predictions.select(['prediction']).collect()

In [65]:
from sklearn.metrics import classification_report, confusion_matrix

In [66]:
#Classification Report
print(classification_report(y_true, y_pred))

              precision    recall  f1-score   support

           0       0.80      0.90      0.85       125
           1       0.82      0.68      0.74        87

    accuracy                           0.81       212
   macro avg       0.81      0.79      0.79       212
weighted avg       0.81      0.81      0.80       212



In [67]:
#Confusion Matrix
print(confusion_matrix(y_true, y_pred))

[[112  13]
 [ 28  59]]
