In [1]:
#Reading the Kaggle excel Data
train_rawData= sqlContext.read.format('csv').load('/FileStore/tables/train.csv',inferSchema=True,header=True)
test_rawData= sqlContext.read.format('csv').load('/FileStore/tables/test.csv',inferSchema=True,header=True)

In [2]:
#Convert Spark DataFrame to Pandas DataFrame for easily computing mean and most frequent category value of missing value columns
import pandas as pd
train_rawPandas =pd.DataFrame(train_rawData.take(train_rawData.count()),columns=train_rawData.columns)
test_rawPandas =pd.DataFrame(test_rawData.take(test_rawData.count()),columns=test_rawData.columns)

In [3]:
from pyspark.sql.functions import isnan,isnull ,when, count
#TrainData NullValues Fixing 
mostFrequentEmbarkValue =(train_rawPandas['Embarked'].value_counts().index[0])
meanAge = float(train_rawPandas.Age.mean())
nullfixedTrain_data = train_rawData.na.fill({'Age': meanAge, 'Embarked': str(mostFrequentEmbarkValue)})

#TestData NullValues Fixing 
a =(test_rawPandas['Embarked'].value_counts().index[0])
meanAge = float(test_rawPandas.Age.mean())
nullfixedTest_data = test_rawData.na.fill({'Age': meanAge, 'Embarked': str(a)})

#Confirm if data is without null entries
nullfixedTrain_data.select([count(when(isnull(c), c)).alias(c) for c in nullfixedTrain_data.columns]).show()
nullfixedTest_data.select([count(when(isnan(c), c)).alias(c) for c in nullfixedTest_data.columns]).show()
#Cabin column has got null entries but I have decided to drop the column for my analysis

In [4]:
#Dropping columns(features) with lesser siginificance on predicting Survival
significantColumnsTrain = nullfixedTrain_data.drop('PassengerId').drop('Name').drop('Ticket').drop('Cabin')
significantColumnsTest = nullfixedTest_data.drop('PassengerId').drop('Cabin').drop('Name').drop('Ticket')

In [5]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

categoricalFeatures = ["Sex", "Embarked"]
stages = [] # stages in our Pipeline
for categoricalCol in categoricalFeatures:
  # Category Indexing with StringIndexer
  stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")
  # Use OneHotEncoder to convert categorical variables into binary SparseVectors
  encoder = OneHotEncoder(inputCol=categoricalCol+"Index", outputCol=categoricalCol+"classVec")
  # Add stages.  These are not run here, but will run all at once later on.
  stages += [stringIndexer, encoder]


In [6]:
numericCols = ["Pclass","Age", "SibSp", "Parch", "Fare"]
assemblerInputs = map(lambda c: c + "classVec", categoricalFeatures) + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [7]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)
# Run the feature transformations.
#  - fit() computes feature statistics as needed.
#  - transform() actually transforms the features.
pipelineModel = pipeline.fit(significantColumnsTrain)
processedTrain = pipelineModel.transform(significantColumnsTrain)

In [8]:
#Splitting Data to compare performance of classifiers
(PerformanceMetricTrainData, PerformanceMetricTestData) = processedTrain.randomSplit([0.8, 0.2], seed = 100)
print PerformanceMetricTrainData.count()
print PerformanceMetricTestData.count()

In [9]:
#Logistic Regression
from pyspark.ml.classification import LogisticRegression
# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="Survived", featuresCol="features", maxIter=10)
# Train model with Training Data
lrModel = lr.fit(PerformanceMetricTrainData)
#Transform/Predict on remaining trainingData from 70-30 split
predictionsLR = lrModel.transform(PerformanceMetricTestData)
#predictionsLR.select('rawPrediction').show(89,truncate=False)

In [10]:
#Decision Tree Classifier
from pyspark.ml.classification import DecisionTreeClassifier
#fitting the data
dtModel = DecisionTreeClassifier(maxDepth=4,labelCol='Survived').fit(PerformanceMetricTrainData)
predictionsDT =dtModel.transform(PerformanceMetricTestData)

In [11]:
# RandomForestClassifier
from pyspark.ml.classification import RandomForestClassifier
rfModel = RandomForestClassifier(numTrees = 100, labelCol = 'Survived').fit(PerformanceMetricTrainData)
predictionsRF = rfModel.transform(PerformanceMetricTestData)

In [12]:
#Multi-layer Perceptron
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.classification import MultilayerPerceptronClassificationModel
layer = [8,6,7,2]
mcP = MultilayerPerceptronClassifier(labelCol = 'Survived',maxIter=100, layers=layer, blockSize=128,seed=1234)
# train the model
mcPmodel = mcP.fit(PerformanceMetricTrainData)

# Predict label on the test set
mcPresult = mcPmodel.transform(PerformanceMetricTestData)


In [13]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator,MulticlassClassificationEvaluator
predictionsLR.withColumn("label",predictionsLR.Survived.cast('double')).select("label","prediction")
binaryClassEvaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol="Survived")
LRmetric = binaryClassEvaluator.evaluate(predictionsLR)
DTmetric = binaryClassEvaluator.evaluate(predictionsDT)
RFmetric = binaryClassEvaluator.evaluate(predictionsRF)
mlPCmetric = binaryClassEvaluator.evaluate(mcPresult)

In [14]:
def computeAccuracy(act,pre):
  #check if length of lists are equal
  if(len(act) != len(pre)):
    return 'Lists are of different size'
  length = len(act) #can also use predicted lists
  hit = 0
  for i in range(length):
    if act[i] == pre[i]:
      hit += 1
  accuracy = float(hit)/float(length)
  return accuracy

In [15]:
def prepareAccuracyList1(actuals,predicted):
  actuals_array = [float(i.Survived) for i in actuals]
  predicted_array = [float(i.prediction) for i in predicted]
  return actuals_array,predicted_array


In [16]:
actualsLR = predictionsLR.select('Survived').collect()
predictedLR = predictionsLR.select('prediction').collect()
actualsDT = predictionsDT.select('Survived').collect()
predictedDT = predictionsDT.select('prediction').collect()
actualsRF = predictionsRF.select('Survived').collect()
predictedRF = predictionsRF.select('prediction').collect()
actualsMultilayer = mcPresult.select('Survived').collect()
predictedMultilayer = mcPresult.select('prediction').collect()

actualsLRList,predictedLRList =prepareAccuracyList1(actualsLR,predictedLR)
actualsDTList,predictedDTList =prepareAccuracyList1(actualsDT,predictedDT)
actualsRFList,predictedRFList =prepareAccuracyList1(actualsRF,predictedRF)
actualsMLCPList,predictedMLCPList =prepareAccuracyList1(actualsMultilayer,predictedMultilayer)



accuracyLR = computeAccuracy(actualsLRList,predictedLRList)
accuracyDT = computeAccuracy(actualsDTList,predictedDTList)
accuracyRF = computeAccuracy(actualsRFList,predictedRFList)
accuracyMLP =computeAccuracy(actualsMLCPList,predictedMLCPList)

In [17]:
df1 = spark.createDataFrame([
    ("Logistic Regression",accuracyLR, LRmetric),
    ("DecisionTree", accuracyDT,DTmetric),
    ("RandomForest",accuracyRF,RFmetric),
    ("MultilayerPerceptronClassifier",accuracyMLP,mlPCmetric),
], ["Classifier", "accuracy",binaryClassEvaluator.getMetricName()])

In [18]:
#df1.show(truncate=False)
display(df1)