In [None]:
import findspark
findspark.init()

In [None]:
import pandas as pd
pd.set_option('display.max_colwidth', None)

In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = ' pyspark-shell'

In [None]:
from pyspark.sql.session import SparkSession

spark = (SparkSession.builder
    .appName("Titanic - Analytics - MLlib")
    .config("spark.sql.warehouse.dir","hdfs://localhost:9000/warehouse")
    .enableHiveSupport()
    .getOrCreate())

In [None]:
titanic_raw = (spark.read
                    .option("inferSchema", "true")
                    .option('header', 'true')
                    .csv("hdfs://localhost:9000/datalake/raw/kaggle/titanic/")
                    .cache())

In [None]:
titanic_raw.limit(5).toPandas()

In [None]:
titanic_raw.printSchema()

In [None]:
passengers_count = titanic_raw.count()
print (f"Total number of passenger: {passengers_count}")

In [None]:
titanic_raw.summary().toPandas()

In [None]:
titanic_raw.groupBy("Survived").count().toPandas()

In [None]:
titanic_raw.groupBy("Sex","Survived").count().toPandas()

In [None]:
titanic_raw.groupBy("Pclass","Survived").count().toPandas()

In [None]:
titanic_df = titanic_raw

In [None]:
from pyspark.sql.functions import isnull, when, count, col

# Option 1
titanic_df.select([count(when(isnull(c), c)).alias(c) for c in titanic_df.columns]).toPandas()

In [None]:
# Option 2
titanic_df.summary().toPandas()

In [None]:
titanic_df = titanic_df.drop("Cabin")

In [None]:
titanic_df.groupBy("Embarked").count().toPandas()

In [None]:
titanic_df = titanic_df.na.fill({"Embarked" : 'S'})

In [None]:
from pyspark.sql.functions import regexp_extract
titanic_df = titanic_df.withColumn("Initial",regexp_extract(col("Name"),"([A-Za-z]+)\.",1))

In [None]:
titanic_df.limit(5).toPandas()

In [None]:
titanic_df.select("Initial").distinct().sort("Initial").toPandas()

In [None]:
titanic_df = titanic_df.replace(
               ['Mlle','Mme', 'Ms', 'Dr','Major','Lady','Countess','Jonkheer','Col','Rev','Capt','Sir','Don'],
               ['Miss','Miss','Miss','Mr','Mr',  'Mrs',  'Mrs',  'Other',  'Other','Other','Mr','Mr','Mr'])

In [None]:
titanic_df.select("Initial").distinct().toPandas()

In [None]:
from pyspark.sql.functions import round

avg_age_df = (titanic_df.groupby('Initial').avg('Age')
                        .withColumnRenamed("avg(Age)","Age"))
avg_age_df.toPandas()

In [None]:
titanic_df_noage = titanic_df.where(col("Age").isNull()).drop("Age")
titanic_df_noage.limit(1).toPandas()

In [None]:
titanic_df_noage_with_avg = titanic_df_noage.join(avg_age_df, "Initial")
titanic_df_noage_with_avg.limit(1).toPandas()

In [None]:
titanic_df_fixed = (titanic_df.where(col("Age").isNotNull())
                      .unionByName(titanic_df_noage_with_avg))

titanic_df_fixed.where(col("Age").isNull()).count()

In [None]:
titanic_df = titanic_df_fixed

In [None]:
titanic_df = titanic_df.withColumn("Family_Size",col('SibSp')+col('Parch'))

In [None]:
titanic_df.groupBy("Family_Size").count().toPandas()

In [None]:
from pyspark.sql.functions import lit
titanic_df = titanic_df.withColumn("Alone",when(titanic_df["Family_Size"] == 0, 1).otherwise(lit(0)))

In [None]:
titanic_df.columns

In [None]:
from pyspark.sql.functions import countDistinct

titanic_df.select([countDistinct(c).alias(c) for c in titanic_df.columns]).toPandas()

In [None]:
titanic_df = titanic_df.drop("PassengerId","Name","Ticket","Initial")

In [None]:
titanic_df.printSchema()

In [None]:
titanic_df = titanic_df.select(col('Survived').cast('double'),
                              col('Pclass').cast('double'),
                              col('Sex'),
                              col('Age').cast('double'),
                              col('SibSp').cast('double'),
                              col('Parch').cast('double'),
                              col('Fare').cast('double'),
                              col('Embarked'),
                              col('Family_Size').cast('double'),
                              col('Alone').cast('double')
                             )

In [None]:
titanic_df.printSchema()

In [None]:
label_column = "Survived"

categoricalCols = [field for (field, dataType) in titanic_df.dtypes if ((dataType == "string") & (field != label_column))]
numericCols = [field for (field, dataType) in titanic_df.dtypes if ((dataType == "double") & (field != label_column))]

print (f"categorical columns: {categoricalCols}")
print (f"numerical columns: {numericCols}")

In [None]:
indexOutputCols = [x + "Index" for x in categoricalCols]
oheOutputCols = [x + "OHE" for x in categoricalCols]

print (f"StringIndexer column names: {indexOutputCols}")
print (f"OHE column names: {oheOutputCols}")

In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=indexOutputCols, handleInvalid="skip")

oheEncoder = OneHotEncoder(inputCols=indexOutputCols,outputCols=oheOutputCols)

In [None]:
temp_df = stringIndexer.fit(titanic_df).transform(titanic_df)
temp_df.toPandas()

In [None]:
oheEncoder.fit(temp_df).transform(temp_df).toPandas()

In [None]:
assemblerInputs = oheOutputCols + numericCols
print("Feature columns: ",assemblerInputs)

In [None]:
from pyspark.ml.feature import VectorAssembler

vecAssembler = VectorAssembler(inputCols=assemblerInputs,outputCol="features")

In [None]:
from pyspark.ml import Pipeline

test_pipeline = Pipeline(stages = [stringIndexer, oheEncoder, vecAssembler])
features_df = test_pipeline.fit(titanic_df).transform(titanic_df)
features_df.limit(2).toPandas()

In [None]:
seed=11

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import LinearSVC

lr = LogisticRegression(labelCol="Survived", featuresCol="features")
dt = DecisionTreeClassifier(labelCol="Survived", featuresCol="features",seed=seed)
rf = RandomForestClassifier(labelCol="Survived", featuresCol="features",maxDepth=10,seed=seed)
gbt = GBTClassifier(labelCol="Survived", featuresCol="features",maxIter=10,seed=seed)
nb = NaiveBayes(labelCol="Survived", featuresCol="features")
svm = LinearSVC(labelCol="Survived", featuresCol="features")

classifiers = [lr,dt,rf,gbt,nb,svm]
classifiers

In [None]:
from pyspark.ml import Pipeline

def create_pipeline(classifier):
    return Pipeline(stages = [stringIndexer, oheEncoder, vecAssembler, classifier])

pipelines = [create_pipeline(classifier) for classifier in classifiers]
pipelines

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="Survived",  metricName="accuracy")

In [None]:
(trainingData, testData) = titanic_df.randomSplit([0.8,0.2],seed=seed)

In [None]:
(trainingData1, testData1) = titanic_df.where("Survived=0").randomSplit([0.8,0.2],seed=seed)
(trainingData2, testData2) = titanic_df.where("Survived=1").randomSplit([0.8,0.2],seed=seed)

traininData = trainingData1.unionByName(trainingData2)
testData = testData1.unionByName(testData2)

In [None]:
models = [pipeline.fit(trainingData) for pipeline in pipelines]
models

In [None]:
names = []
values = [] 
for model in models:
    prediction_df = model.transform(testData)
    accuracy = evaluator.evaluate(prediction_df)
    names.append(type(model.stages[-1]).__name__) # the algorithm is the last stage in the pipeline
    values.append(accuracy)

data = {'name':names,'accuracy':values,'model':models}
df = pd.DataFrame(data)
df.sort_values(by=['accuracy'], inplace=True, ascending=False)  
df

In [None]:
best_model=df.iloc[0]['model']

In [None]:
best_model.transform(testData).groupby("Survived").pivot("prediction").count().toPandas()

In [None]:
modelPath = "hdfs://localhost:9000/model-registry/titanic-survival-classifier"
best_model.write().overwrite().save(modelPath)

In [None]:
from pyspark.ml import PipelineModel
savedModel = PipelineModel.load(modelPath)

In [None]:
predictions = savedModel.transform(testData)
predictions.select("features", "Survived", "prediction").limit(200).toPandas()