### This is a very simple example on how to use PySpark ML on kaggle's titanic dataset.

Problem statement and dataset can be found here https://www.kaggle.com/c/titanic

### Prerequisites:
  * Basics understand of python https://www.kaggle.com/learn/python
  * Pandas https://www.kaggle.com/learn/pandas
  * Machine learning https://www.kaggle.com/learn/machine-learning

  * Apache Spark 

      https://docs.databricks.com/spark/latest/gentle-introduction/gentle-intro.html
      
      https://docs.databricks.com/spark/latest/gentle-introduction/gentle-intro.html#gentle-introduction-to-apache-spark
      
      https://docs.databricks.com/spark/latest/gentle-introduction/for-data-scientists.html

#### Importing the necessary libraries

In [4]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer


#### Starting Point: SparkSession

The entry point into all functionality in Spark is the SparkSession class. To create a basic SparkSession, just use SparkSession.builder

In [7]:
spark = SparkSession \
    .builder \
    .appName("Spark ML example on titanic data ") \
    .getOrCreate()

Next, we have to import the dataset. In this file, I am importing dataset from S3(https://docs.databricks.com/spark/latest/data-sources/aws/amazon-s3.html).

You can can directly upload dataset to databricks cloud(https://docs.databricks.com/user-guide/importing-data.html#import-data)

In [9]:
s3_bucket_path = "/mnt/lp-dataset/titanic/train.csv"

In [10]:
titanic_df = spark.read.csv(s3_bucket_path,header = 'True',inferSchema='True')

In [11]:
passengers_count = titanic_df.count()

In [12]:
print(passengers_count)

Lets  view few rows

In [14]:
titanic_df.show(5)

Summary of data

In [16]:
titanic_df.describe().show()

Let's see Schema of our dataset

In [18]:
titanic_df.printSchema()

Let's select few features

In [20]:
titanic_df.select("Survived","Pclass","Embarked").show()

### Let's do some simple exploratory data analysis (EDA)

How many Passengers Survived ?

In [23]:
titanic_df.groupBy("Survived").count().show()

Out of 891 passengers in dataset, only around 342 survived.

We need to dig down more to get better insights from the data and see which categories of the passengers did survive and who didn't. We will try to check the survival rate by using the different features of the dataset. Some of the features being Sex, Port Of Embarcation, Age,etc.

Lets check survival rate with Sex

In [27]:
titanic_df.groupBy("Sex","Survived").count().show()

This looks interesting. The number of men on the ship is lot more than the number of women. Still the number of women saved is almost twice the number of males saved.

In [29]:
titanic_df.groupBy("Pclass","Survived").count().show()

We can clearly see that Passenegers Of Pclass 1 were given a very high priority while rescue. Even though the the number of Passengers in Pclass 3 were a lot higher, still the number of survival from them is very low.

#### Checking Null values

In [32]:
# This function use to print feature with null values and null count 
def null_value_count(df):
  null_columns_counts = []
  numRows = df.count()
  for k in df.columns:
    nullRows = df.where(col(k).isNull()).count()
    if(nullRows > 0):
      temp = k,nullRows
      null_columns_counts.append(temp)
  return(null_columns_counts)

In [33]:
# Calling function
null_columns_count_list = null_value_count(titanic_df)


In [34]:
spark.createDataFrame(null_columns_count_list, ['Column_With_Null_Value', 'Null_Values_Count']).show()

Age feature has 177 null values.

In [36]:
mean_age = titanic_df.select(mean('Age')).collect()[0][0]
print(mean_age)

To replace these NaN values, we can assign them the mean age of the dataset.But the problem is, there were many people with many different ages. We just cant assign a 4 year kid with the mean age that is 29 years.

we can check the Name feature. Looking upon the feature, we can see that the names have a salutation like Mr or Mrs. Thus we can assign the mean values of Mr and Mrs to the respective groups

In [39]:
titanic_df = titanic_df.withColumn("Initial",regexp_extract(col("Name"),"([A-Za-z]+)\.",1))

Using the Regex ""[A-Za-z]+)." we extract the initials from the Name. It looks for strings which lie between A-Z or a-z and followed by a .(dot).

In [41]:
titanic_df.show()

In [42]:
titanic_df.select("Initial").distinct().show()


There are some misspelled Initials like Mlle or Mme that stand for Miss. I will replace them with Miss and same thing for other values.

In [44]:
titanic_df = titanic_df.replace(['Mlle','Mme', 'Ms', 'Dr','Major','Lady','Countess','Jonkheer','Col','Rev','Capt','Sir','Don'],
               ['Miss','Miss','Miss','Mr','Mr',  'Mrs',  'Mrs',  'Other',  'Other','Other','Mr','Mr','Mr'])


In [45]:
titanic_df.select("Initial").distinct().show()


lets check the average age by Initials

In [47]:
titanic_df.groupby('Initial').avg('Age').collect()

Let's impute missing values in age feature based on average age of Initials

In [49]:
titanic_df = titanic_df.withColumn("Age",when((titanic_df["Initial"] == "Miss") & (titanic_df["Age"].isNull()), 22).otherwise(titanic_df["Age"]))
titanic_df = titanic_df.withColumn("Age",when((titanic_df["Initial"] == "Other") & (titanic_df["Age"].isNull()), 46).otherwise(titanic_df["Age"]))
titanic_df = titanic_df.withColumn("Age",when((titanic_df["Initial"] == "Master") & (titanic_df["Age"].isNull()), 5).otherwise(titanic_df["Age"]))
titanic_df = titanic_df.withColumn("Age",when((titanic_df["Initial"] == "Mr") & (titanic_df["Age"].isNull()), 33).otherwise(titanic_df["Age"]))
titanic_df = titanic_df.withColumn("Age",when((titanic_df["Initial"] == "Mrs") & (titanic_df["Age"].isNull()), 36).otherwise(titanic_df["Age"]))


Check the imputation

In [51]:
titanic_df.filter(titanic_df.Age==46).select("Initial").show()


In [52]:
titanic_df.select("Age").show()

Embarked feature has only two missining values. Let's check values within Embarked

In [54]:
titanic_df.groupBy("Embarked").count().show()

Majority Passengers boarded from "S". We can impute with "S"

In [56]:
titanic_df = titanic_df.na.fill({"Embarked" : 'S'})


We can drop Cabin features as it has lots of null values

In [58]:
titanic_df = titanic_df.drop("Cabin")

In [59]:
titanic_df.printSchema()

We can create a new feature called "Family_size" and "Alone" and analyse it. This feature is the summation of Parch(parents/children) and SibSp(siblings/spouses). It gives us a combined data so that we can check if survival rate have anything to do with family size of the passengers

In [61]:
titanic_df = titanic_df.withColumn("Family_Size",col('SibSp')+col('Parch'))

In [62]:
titanic_df.groupBy("Family_Size").count().show()

In [63]:
titanic_df = titanic_df.withColumn('Alone',lit(0))


In [64]:
titanic_df = titanic_df.withColumn("Alone",when(titanic_df["Family_Size"] == 0, 1).otherwise(titanic_df["Alone"]))

In [65]:
titanic_df.columns

Lets convert Sex, Embarked & Initial columns from string to number using StringIndexer

In [67]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(titanic_df) for column in ["Sex","Embarked","Initial"]]
pipeline = Pipeline(stages=indexers)
titanic_df = pipeline.fit(titanic_df).transform(titanic_df)

In [68]:
titanic_df.show()

In [69]:
titanic_df.printSchema()

Drop columns which are not required

In [71]:
titanic_df = titanic_df.drop("PassengerId","Name","Ticket","Cabin","Embarked","Sex","Initial")

In [72]:
titanic_df.show()

Let's put all features into a vector

In [74]:
feature = VectorAssembler(inputCols=titanic_df.columns[1:],outputCol="features")
feature_vector= feature.transform(titanic_df)

In [75]:
feature_vector.show()

Now that the data is all set, let's split it into training and test. I'll be using a 80%, 80% split.

In [77]:
(trainingData, testData) = feature_vector.randomSplit([0.8, 0.2],seed = 11)

### Modelling

Here is the list of few Classification Algorithms from Spark ML 

LogisticRegression

DecisionTreeClassifier

RandomForestClassifier

Gradient-boosted tree classifier

NaiveBayes

Support Vector Machine

LogisticRegression

In [81]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="Survived", featuresCol="features")
#Training algo
lrModel = lr.fit(trainingData)
lr_prediction = lrModel.transform(testData)
lr_prediction.select("prediction", "Survived", "features").show()
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")

Evaluate how well is LogisticRegression doing

In [83]:
lr_accuracy = evaluator.evaluate(lr_prediction)
print("Accuracy of LogisticRegression is = %g"% (lr_accuracy))
print("Test Error of LogisticRegression = %g " % (1.0 - lr_accuracy))

DecisionTreeClassifier

In [85]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="Survived", featuresCol="features")
dt_model = dt.fit(trainingData)
dt_prediction = dt_model.transform(testData)
dt_prediction.select("prediction", "Survived", "features").show()


Evaluate how well is DecisionTreeClassifier doing

In [87]:
dt_accuracy = evaluator.evaluate(dt_prediction)
print("Accuracy of DecisionTreeClassifier is = %g"% (dt_accuracy))
print("Test Error of DecisionTreeClassifier = %g " % (1.0 - dt_accuracy))


RandomForestClassifier

In [89]:
from pyspark.ml.classification import RandomForestClassifier
rf = DecisionTreeClassifier(labelCol="Survived", featuresCol="features")
rf_model = rf.fit(trainingData)
rf_prediction = rf_model.transform(testData)
rf_prediction.select("prediction", "Survived", "features").show()

Evaluate how well is RandomForestClassifier doing

In [91]:
rf_accuracy = evaluator.evaluate(rf_prediction)
print("Accuracy of RandomForestClassifier is = %g"% (rf_accuracy))
print("Test Error of RandomForestClassifier  = %g " % (1.0 - rf_accuracy))

Gradient-boosted tree classifier

In [93]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="Survived", featuresCol="features",maxIter=10)
gbt_model = gbt.fit(trainingData)
gbt_prediction = gbt_model.transform(testData)
gbt_prediction.select("prediction", "Survived", "features").show()


Evaluate how well is Gradient-boosted doing

In [95]:
gbt_accuracy = evaluator.evaluate(gbt_prediction)
print("Accuracy of Gradient-boosted tree classifie is = %g"% (gbt_accuracy))
print("Test Error of Gradient-boosted tree classifie %g"% (1.0 - gbt_accuracy))

Evaluate how well is DecisionTreeClassifier doing

NaiveBayes

In [98]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(labelCol="Survived", featuresCol="features")
nb_model = nb.fit(trainingData)
nb_prediction = nb_model.transform(testData)
nb_prediction.select("prediction", "Survived", "features").show()


Evaluate how well is NaiveBayes doing

In [100]:
nb_accuracy = evaluator.evaluate(nb_prediction)
print("Accuracy of NaiveBayes is  = %g"% (nb_accuracy))
print("Test Error of NaiveBayes  = %g " % (1.0 - nb_accuracy))

Support Vector Machine

In [102]:
from pyspark.ml.classification import LinearSVC
svm = LinearSVC(labelCol="Survived", featuresCol="features")
svm_model = svm.fit(trainingData)
svm_prediction = svm_model.transform(testData)
svm_prediction.select("prediction", "Survived", "features").show()


Evaluate how well is Support Vector Machine doing

In [104]:
svm_accuracy = evaluator.evaluate(svm_prediction)
print("Accuracy of Support Vector Machine is = %g"% (svm_accuracy))
print("Test Error of Support Vector Machine = %g " % (1.0 - svm_accuracy))

How to increase accuracy of a model ?
  * Add new features or drop existing features and train model
  * Tune ML algorith (https://spark.apache.org/docs/latest/ml-tuning.html)

### Reference

https://spark.apache.org/docs/latest/ml-classification-regression.html