In [7]:
from pyspark.sql import SparkSession

spark=SparkSession\
.builder\
.appName('Examine data about passengers on the Titanic')\
.getOrCreate()

rawData=spark.read\
.format('csv')\
.option('header','true')\
.load('/Users/shravaniroy/Downloads/titanic/train.csv')


In [8]:
rawData.toPandas().head()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35,1,0,113803,53.1,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35,0,0,373450,8.05,,S


In [9]:
from pyspark.sql.functions import col

dataset=rawData.select(col('Survived').cast('float'),
                       col('PClass').cast('float'),
                       col('Sex'),
                       col('Age').cast('float'),
                       col('Fare').cast('float'),
                       col('Embarked')
                      )
dataset.toPandas().head()


Unnamed: 0,Survived,PClass,Sex,Age,Fare,Embarked
0,0.0,3.0,male,22.0,7.25,S
1,1.0,1.0,female,38.0,71.283302,C
2,1.0,3.0,female,26.0,7.925,S
3,1.0,1.0,female,35.0,53.099998,S
4,0.0,3.0,male,35.0,8.05,S


In [10]:
dataset=dataset.replace('?',None).dropna(how='any')


In [15]:
from pyspark.ml.feature import StringIndexer
#convert categorical data from string to columns with numeric value
dataset=StringIndexer(
    inputCol='Sex',
    outputCol='Gender',
     handleInvalid='keep').fit(dataset).transform(dataset)
    
dataset=StringIndexer(
    inputCol='Embarked',
    outputCol='Boarded',
    handleInvalid='keep').fit(dataset).transform(dataset)

dataset.toPandas().head()

Unnamed: 0,Survived,PClass,Sex,Age,Fare,Embarked,Gender,Boarded
0,0.0,3.0,male,22.0,7.25,S,0.0,0.0
1,1.0,1.0,female,38.0,71.283302,C,1.0,1.0
2,1.0,3.0,female,26.0,7.925,S,1.0,0.0
3,1.0,1.0,female,35.0,53.099998,S,1.0,0.0
4,0.0,3.0,male,35.0,8.05,S,0.0,0.0


In [16]:
dataset=dataset.drop('Sex')
dataset=dataset.drop('Embarked')
dataset.toPandas().head()

Unnamed: 0,Survived,PClass,Age,Fare,Gender,Boarded
0,0.0,3.0,22.0,7.25,0.0,0.0
1,1.0,1.0,38.0,71.283302,1.0,1.0
2,1.0,3.0,26.0,7.925,1.0,0.0
3,1.0,1.0,35.0,53.099998,1.0,0.0
4,0.0,3.0,35.0,8.05,0.0,0.0


In [20]:
requiredFeatures=['Survived',
                 'PClass',
                 'Age',
                 'Fare',
                 'Gender',
                 'Boarded']
#VectorAssembler assembles the required features into a single features column.VectorAssembeler is a tranformer in Spark, it takes in our dataframe and creates a new dataframe with a new column added

In [21]:
from pyspark.ml.feature import VectorAssembler

assembler=VectorAssembler(inputCols=requiredFeatures,outputCol='features')

In [22]:
transformed_data=assembler.transform(dataset)

In [23]:
transformed_data.toPandas().head()

Unnamed: 0,Survived,PClass,Age,Fare,Gender,Boarded,features
0,0.0,3.0,22.0,7.25,0.0,0.0,"[0.0, 3.0, 22.0, 7.25, 0.0, 0.0]"
1,1.0,1.0,38.0,71.283302,1.0,1.0,"[1.0, 1.0, 38.0, 71.2833023071289, 1.0, 1.0]"
2,1.0,3.0,26.0,7.925,1.0,0.0,"[1.0, 3.0, 26.0, 7.925000190734863, 1.0, 0.0]"
3,1.0,1.0,35.0,53.099998,1.0,0.0,"[1.0, 1.0, 35.0, 53.099998474121094, 1.0, 0.0]"
4,0.0,3.0,35.0,8.05,0.0,0.0,"[0.0, 3.0, 35.0, 8.050000190734863, 0.0, 0.0]"


In [41]:
from pyspark.ml.clustering import KMeans
#seed determines the cluster centers
kmeans=KMeans(k=8,seed=1)
model=kmeans.fit(transformed_data)

In [42]:
#To get the clustered value on our training data we call model.transform on our training dataset to get the clustered result in a dataframe
clusterData=model.transform(transformed_data)


In [43]:
#clusteringEvaluator helps us to evaluate how well we cluster the underlying data
from pyspark.ml.evaluation import ClusteringEvaluator

evaluator=ClusteringEvaluator()
silhouette=evaluator.evaluate(clusterData)
#silhouette=measure of how similar an object is to its cluster
#Silhouette value of 1 is ideal

print('Silhouette with squared euclidean distance=',silhouette)

('Silhouette with squared euclidean distance=', 0.6074848997883804)


In [44]:
#Print out the centers of each of the 5 clusters
#Every cluster center is an array whose length is equal to the number of features in our training dataset=6
centers=model.clusterCenters()
print('Cluster Centers:')
for center in centers:
    print(center)

Cluster Centers:
[ 0.2755418   2.73065015 25.43343653 10.4654787   0.2879257   0.21671827]
[  0.73333333   1.          30.33333333 239.99193726   0.73333333
   0.53333333]
[  0.75         1.          31.89       132.95237487   0.64285714
   0.46428571]
[ 0.59259259  1.37037037 35.98148148 52.62068487  0.42592593  0.31481481]
[1.00000000e+00 1.00000000e+00 3.53333333e+01 5.12329224e+02
 3.33333333e-01 1.00000000e+00]
[ 0.31543624  2.0738255  46.03355705 19.98168321  0.27516779  0.19463087]
[ 0.56179775  2.56179775  8.74438202 26.98651588  0.49438202  0.2247191 ]
[ 0.68627451  1.09803922 36.39215686 81.40024537  0.54901961  0.50980392]


In [45]:
clusterData.toPandas().head()

Unnamed: 0,Survived,PClass,Age,Fare,Gender,Boarded,features,prediction
0,0.0,3.0,22.0,7.25,0.0,0.0,"[0.0, 3.0, 22.0, 7.25, 0.0, 0.0]",0
1,1.0,1.0,38.0,71.283302,1.0,1.0,"[1.0, 1.0, 38.0, 71.2833023071289, 1.0, 1.0]",7
2,1.0,3.0,26.0,7.925,1.0,0.0,"[1.0, 3.0, 26.0, 7.925000190734863, 1.0, 0.0]",0
3,1.0,1.0,35.0,53.099998,1.0,0.0,"[1.0, 1.0, 35.0, 53.099998474121094, 1.0, 0.0]",3
4,0.0,3.0,35.0,8.05,0.0,0.0,"[0.0, 3.0, 35.0, 8.050000190734863, 0.0, 0.0]",0


In [46]:
from pyspark.sql.functions import *

dataset.select(avg('Survived'),
               avg('PClass'),
               avg('Age'),
               avg('Fare'),
               avg('Gender'),
               avg('Boarded')
              ).toPandas()

Unnamed: 0,avg(Survived),avg(PClass),avg(Age),avg(Fare),avg(Gender),avg(Boarded)
0,0.404494,2.240169,29.642093,34.567251,0.363764,0.261236


In [47]:
#We use this averages to see how the averages across the entire dataset stack up against average values of individual clusters
clusterData.groupBy('prediction').agg(avg('Survived'),
               avg('PClass'),
               avg('Age'),
               avg('Fare'),
               avg('Gender'),
               avg('Boarded'),
               count('prediction')                       
              ).orderBy('prediction').toPandas()

#We groupby prediction column which contains the clusters associated with every record.
#total number of datapoints within each cluster is determined by the count()


Unnamed: 0,prediction,avg(Survived),avg(PClass),avg(Age),avg(Fare),avg(Gender),avg(Boarded),count(prediction)
0,0,0.275542,2.73065,25.433437,10.465479,0.287926,0.216718,323
1,1,0.733333,1.0,30.333333,239.991937,0.733333,0.533333,15
2,2,0.75,1.0,31.89,132.952375,0.642857,0.464286,28
3,3,0.592593,1.37037,35.981481,52.620685,0.425926,0.314815,54
4,4,1.0,1.0,35.333333,512.329224,0.333333,1.0,3
5,5,0.315436,2.073826,46.033557,19.981683,0.275168,0.194631,149
6,6,0.561798,2.561798,8.744382,26.986516,0.494382,0.224719,89
7,7,0.686275,1.098039,36.392157,81.400245,0.54902,0.509804,51


In [48]:
#Survival rate is below average for cluster 0
#Passenger classis less likely to be first class
#Fare paid is below average
#Gender skews male(0 is male)

#Survival rate is high for cluster 1
#All first class passengers
#fare paid is very high
#Gender is mostly female


In [40]:
clusterData.filter(clusterData.prediction==1).toPandas()
#to view the data only for a single cluster

Unnamed: 0,Survived,PClass,Age,Fare,Gender,Boarded,features,prediction
0,0.0,1.0,19.0,263.0,0.0,0.0,"[0.0, 1.0, 19.0, 263.0, 0.0, 0.0]",1
1,1.0,1.0,23.0,263.0,1.0,0.0,"[1.0, 1.0, 23.0, 263.0, 1.0, 0.0]",1
2,0.0,1.0,24.0,247.520798,0.0,1.0,"[0.0, 1.0, 24.0, 247.5207977294922, 0.0, 1.0]",1
3,1.0,1.0,50.0,247.520798,1.0,1.0,"[1.0, 1.0, 50.0, 247.5207977294922, 1.0, 1.0]",1
4,1.0,1.0,18.0,262.375,1.0,1.0,"[1.0, 1.0, 18.0, 262.375, 1.0, 1.0]",1
5,1.0,1.0,31.0,164.866699,1.0,0.0,"[1.0, 1.0, 31.0, 164.86669921875, 1.0, 0.0]",1
6,1.0,1.0,24.0,263.0,1.0,0.0,"[1.0, 1.0, 24.0, 263.0, 1.0, 0.0]",1
7,0.0,1.0,27.0,211.5,0.0,1.0,"[0.0, 1.0, 27.0, 211.5, 0.0, 1.0]",1
8,1.0,1.0,42.0,227.524994,1.0,1.0,"[1.0, 1.0, 42.0, 227.52499389648438, 1.0, 1.0]",1
9,0.0,1.0,64.0,263.0,0.0,0.0,"[0.0, 1.0, 64.0, 263.0, 0.0, 0.0]",1
