In [1]:
## Imports required
from pyspark.sql import SparkSession,SQLContext,types
from pyspark.sql.functions import expr

spark = SparkSession.builder \
.master('local') \
.appName('Data cleaning') \
.getOrCreate()

import plotly.plotly as py
import plotly.graph_objs as go
import plotly.figure_factory as ff
import pandas as pd
import requests
requests.packages.urllib3.disable_warnings()

import pandas as pd
from handyspark import *
from pyspark.sql.types import FloatType

from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer,VectorAssembler
from pyspark.ml.stat import Correlation
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder
from pyspark.mllib.evaluation import BinaryClassificationMetrics

In [2]:
## load the dataset
file = '/Users/SreeHarsha/Downloads/titanicdataset/titanic_data.csv'
df = spark.read.format("csv").option("header", "true").load(file)

In [3]:
df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|  22|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|  38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|  26|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|  35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|  35|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [4]:
from pyspark.sql.functions import isnan, when, count, col,size

# Count null values in each column of dataset
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|177|    0|    0|     0|   0|  687|       2|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



In [5]:
# Ratio of missing values in Age
150/df.select('Age').count()

0.16835016835016836

In [6]:
# Ratio of missing values in Cabin
599/df.select('Cabin').count()

0.6722783389450057

In [7]:
## Convert the datatype from string to Float( Pyspark ml evaluation metrics handle float values)
df = df.withColumn('SibSp',df['SibSp'].cast(FloatType()))
df =  df.withColumn('Parch',df['Parch'].cast(FloatType()))
df=  df.withColumn('Pclass',df['Pclass'].cast(FloatType()))
df =  df.withColumn('Age',df['Age'].cast(FloatType()))
df =  df.withColumn('Fare',df['Fare'].cast(FloatType()))

In [8]:
## Combine parch and Sibsp variable into new variable Travelbuds 
df = df.withColumn('travelbuds', df["Parch"]+df['Sibsp'])

In [9]:
df =  df.withColumn('Survived',df['Survived'].cast(FloatType()))

In [10]:
## Drop unnecessary columns from the dataset
df= df.drop('Parch')
df= df.drop('Sibsp')
df = df.drop('PassengerId')
df = df.drop('Ticket')
df = df.drop('Name')
df = df.drop('Cabin')

In [11]:
df.show()

+--------+------+------+----+-------+--------+------------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|travelbdfuds|
+--------+------+------+----+-------+--------+------------+
|     0.0|   3.0|  male|22.0|   7.25|       S|         1.0|
|     1.0|   1.0|female|38.0|71.2833|       C|         1.0|
|     1.0|   3.0|female|26.0|  7.925|       S|         0.0|
|     1.0|   1.0|female|35.0|   53.1|       S|         1.0|
|     0.0|   3.0|  male|35.0|   8.05|       S|         0.0|
|     0.0|   3.0|  male|null| 8.4583|       Q|         0.0|
|     0.0|   1.0|  male|54.0|51.8625|       S|         0.0|
|     0.0|   3.0|  male| 2.0| 21.075|       S|         4.0|
|     1.0|   3.0|female|27.0|11.1333|       S|         2.0|
|     1.0|   2.0|female|14.0|30.0708|       C|         1.0|
|     1.0|   3.0|female| 4.0|   16.7|       S|         2.0|
|     1.0|   1.0|female|58.0|  26.55|       S|         0.0|
|     0.0|   3.0|  male|20.0|   8.05|       S|         0.0|
|     0.0|   3.0|  male|39.0| 31.275|   

In [12]:
## Convert the spark dataframe to Handyframe for easier plotting functionalities
hdf = df.toHandy()

In [17]:
data = [go.Histogram(x = hdf.cols['Age'][:])]

In [13]:
# Based on distribution, we use Median of age to impute missing values due to the skew
hdf.cols['Age'].median()

Age    28.0
Name: median, dtype: float32

In [14]:
# impute Embarked with mode as only 2 missing values
hdf.cols['Embarked'].mode()

Embarked    S
Name: mode, dtype: object

In [15]:
# imputing
hdf = hdf.fill(continuous = ['Age'],strategy = ['median'])

In [16]:
hdf = hdf.fill(categorical = ['Embarked'])

In [None]:
py.iplot(data)

In [None]:
## Plotting distributions of Survivors by age
# Add histogram data
x1 = hdf.filter(hdf['Survived']== 1).cols['Age'][:]
x2 = hdf.filter(hdf['Survived']== 0).cols['Age'][:]

survived = go.Histogram(
    x=x1,
    name = 'survived',
    opacity=0.75
)
died = go.Histogram(
    x=x2,
    name = 'died',
    opacity=0.75
)

data = [survived, died]
layout = go.Layout(barmode='overlay')
fig = go.Figure(data=data, layout=layout)

py.iplot(fig)

In [None]:
## Number of survivors by Passenger class
data = [go.Bar(x = hdf.cols['Pclass'][:],
               y = hdf.cols['Survived'][:])]

py.iplot(data)

In [18]:
## Encode categorical variables using onehotencoder
stringIndexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkedIndex")
model = stringIndexer.fit(hdf)
indexed = model.transform(hdf)

encoder = OneHotEncoder(inputCol="EmbarkedIndex", outputCol="EmbarkedVec")
encoded = encoder.transform(indexed)

stringIndexer = StringIndexer(inputCol="Sex", outputCol="SexIndex")
model = stringIndexer.fit(encoded)
indexed = model.transform(encoded)

encoder = OneHotEncoder(inputCol="SexIndex", outputCol="SexVec")
encoded = encoder.transform(indexed)


In [19]:
##Combine all features into a single vector. This vector acts as input for ml algorithm
features = encoded.select([c for c in ['Survived','Pclass','Age','Fare','travelbuds','EmbarkedIndex','EmbarkedVec','SexIndex','SexVec'] ])

vecAssembler = VectorAssembler(inputCols=features.columns, outputCol="features")
features_vec = vecAssembler.transform(features)


In [20]:
##The target variable in spark ml is always named 'label'
features_vec = features_vec.withColumnRenamed("Survived", "label")
features_data = features_vec.select("label", "features")

In [21]:
## Splitting the features into training and test data
feat_train, feat_test = features_data.randomSplit([0.9, 0.1], seed=12345)

In [22]:
## Defing the logistic regression model and Cross validator
lr = LogisticRegression(maxIter=20)

pipeline = Pipeline(stages=[lr])

paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .build()

cross_val = CrossValidator(estimator=pipeline,
                           estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=5)  

model = cross_val.fit(feat_train)


In [23]:
results = model.transform(feat_test)

In [24]:
## Prediction and metrics
predictionLabels = results.select("prediction", "label")
metrics = BinaryClassificationMetrics(predictionLabels.rdd)
metrics.areaUnderROC

1.0

In [25]:
metrics.areaUnderPR

1.0