<a href="https://colab.research.google.com/github/sgambuti/Feature_Engineering/blob/main/feature_engineering_titanic.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [6]:
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
!tar xf spark-3.0.0-bin-hadoop3.2.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"
!pip install -q findspark
import findspark
findspark.init()

In [8]:

from pyspark.sql import SparkSession
from pyspark.ml import feature
from pyspark.ml import classification
from pyspark.sql import functions as fn
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
    MulticlassClassificationEvaluator, \
    RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from pyspark.sql import SparkSession


spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [9]:
titanic_df = spark.read.csv('datasets/titanic_original.csv', header=True, inferSchema=True)

AnalysisException: ignored

In [None]:
titanic_df.limit(10).toPandas()

In [None]:
# some basic cleanup
drop_cols = ['boat', 'body']
new_titanic_df = titanic_df.\
    drop(*drop_cols).\
    withColumnRenamed('home.dest', 'home_dest').\
    fillna('O').\
    dropna(subset=['pclass', 'age', 'sibsp', 'parch', 'fare', 'survived'])

In [None]:
new_titanic_df.limit(10).toPandas()

In [None]:
training, test = new_titanic_df.randomSplit([0.8, 0.2], 0)

## classic pipeline

In [None]:
model0 = Pipeline(stages=[feature.VectorAssembler(inputCols=['pclass', 'age', 'sibsp', 'parch', 'fare'],
                                        outputCol='features'),
                 classification.LogisticRegression(labelCol='survived', featuresCol='features')])

In [None]:
model0_fitted = model0.fit(training)

In [None]:
model0_fitted.transform(test).select(fn.avg(fn.expr('prediction = survived').cast('float'))).show()

In [None]:
new_titanic_df.select(fn.avg('survived')).show()

In [None]:
evaluator = BinaryClassificationEvaluator(labelCol='survived')

In [None]:
evaluator.evaluate(model0_fitted.transform(test))

If we wanted to modify the pipeline to add "sex" (gender) as a feature, we need to modify the point of entry and the next transformation

In [None]:
model1 = Pipeline(stages=[feature.VectorAssembler(inputCols=['pclass', 'age', 'sibsp', 'parch', 'fare'],
                                        outputCol='features'),
                          feature.StringIndexer(inputCol='sex', outputCol='encoded_sex'),
                          feature.VectorAssembler(inputCols=['features', 'encoded_sex'], outputCol='final_features'),
                 classification.LogisticRegression(labelCol='survived', featuresCol='final_features')])

In [None]:
model1_fitted = model1.fit(training)

In [None]:
evaluator.evaluate(model1_fitted.transform(test))

You can use the professor's package `pyspark_pipes` to do this more easily

In [None]:
!pip install git+https://github.com/daniel-acuna/pyspark_pipes.git

In [None]:
# package that makes it easy to build pipelines
from pyspark_pipes import pipe

In [None]:
uber_model = pipe((feature.VectorAssembler(inputCols=['pclass', 'age', 'sibsp', 'parch', 'fare']),
      feature.StringIndexer(inputCol='sex')
     )      
      ,
     feature.VectorAssembler(),
     classification.LogisticRegression(labelCol='survived'))

In [None]:
uber_model_fitted = uber_model.fit(training)

In [None]:
uber_model_fitted.transform(test)

# Automated evaluator

In [None]:
def binary_evaluation(model_pipeline, model_fitted, data):
    return BinaryClassificationEvaluator(labelCol=model_pipeline.getStages()[-1].getLabelCol(), 
                                rawPredictionCol=model_pipeline.getStages()[-1].getRawPredictionCol()).\
    evaluate(model_fitted.transform(data))

# Initial model

$$
p(survived = 1) = f(\text{pclass}, \text{age}, \text{sibsp}, \text{parch}, \text{fare})
$$

In [None]:
model1_pipeline = pipe(feature.VectorAssembler(inputCols=['pclass', 'age', 'sibsp', 'parch', 'fare']),
             classification.LogisticRegression(labelCol='survived'))

In [None]:
model1_fitted = model1_pipeline.fit(training)

In [None]:
binary_evaluation(model1_pipeline, model1_fitted, test)

In [None]:
model1_fitted.stages[-1].coefficients

In [None]:
model1_fitted.stages[-1].intercept

# Some preprocessing of the data

In [None]:
sns.pairplot(new_titanic_df.toPandas()[['pclass', 'age', 'sibsp', 'parch', 'fare']])

In [None]:
import numpy as np

In [None]:
sns.pairplot(pd.DataFrame(np.vstack(pipe(feature.VectorAssembler(inputCols=['pclass', 'age', 'sibsp', 'parch', 'fare']),
    feature.StandardScaler(withMean=True)).fit(new_titanic_df).transform(new_titanic_df).toPandas().iloc[:, -1])))

In [None]:
new_titanic_df.toPandas()[['pclass', 'age', 'sibsp', 'parch', 'fare']].age.hist()
plt.xlabel('age')

In [None]:
d = pd.DataFrame(np.vstack(pipe(feature.VectorAssembler(inputCols=['pclass', 'age', 'sibsp', 'parch', 'fare']),
    feature.StandardScaler(withMean=True)).fit(new_titanic_df).transform(new_titanic_df).toPandas().iloc[:, -1]))
d.columns = ['pclass', 'age', 'sibsp', 'parch', 'fare']
d.age.hist()
plt.xlabel('age')

## Initial model on standardized results

In [None]:
model2_pipeline = pipe(feature.VectorAssembler(inputCols=['pclass', 'age', 'sibsp', 'parch', 'fare']),
              feature.StandardScaler(withMean=True),
             classification.LogisticRegression(labelCol='survived'))

In [None]:
model2_fitted = model2_pipeline.fit(training)

In [None]:
binary_evaluation(model1_pipeline, model1_fitted, test)

In [None]:
binary_evaluation(model2_pipeline, model2_fitted, test)

In [None]:
model2_fitted.stages[-1].intercept

In [None]:
model2_fitted.stages[-1].coefficients

## other scaling

In [None]:
d = pd.DataFrame(np.vstack(pipe(feature.VectorAssembler(inputCols=['pclass', 'age', 'sibsp', 'parch', 'fare']),
    feature.MaxAbsScaler()).fit(new_titanic_df).transform(new_titanic_df).toPandas().iloc[:, -1]))
d.columns = ['pclass', 'age', 'sibsp', 'parch', 'fare']
d.age.hist()
plt.xlabel('age')

# Bucketizer

In [None]:
new_titanic_df.toPandas()[['pclass', 'age', 'sibsp', 'parch', 'fare']].fare.hist()
plt.xlabel('fare')

In [None]:
feature.Bucketizer(splits=[0, 20, 50, 100, 400, 800], inputCol='fare').transform(new_titanic_df).toPandas().iloc[:, -1].hist()
plt.xticks([-1, 0, 1, 2, 3, 4, 5]);
plt.xlabel('Fare bucket')

In [None]:
qd = feature.QuantileDiscretizer().setNumBuckets(4).setInputCol("fare").setOutputCol("result").fit(new_titanic_df)

In [None]:
qd.getSplits()

In [None]:
feature.QuantileDiscretizer().setNumBuckets(4).setInputCol("fare").setOutputCol("result").fit(new_titanic_df).transform(new_titanic_df).toPandas().iloc[:, -1].hist()
plt.xticks([-1, 0, 1, 2, 3, 4, 5]);
plt.xlabel('Fare quantiles')

In [None]:
d = pd.DataFrame(np.vstack(pipe(feature.VectorAssembler(inputCols=['pclass', 'age', 'sibsp', 'parch', 'fare']),
    feature.MaxAbsScaler()).fit(new_titanic_df).transform(new_titanic_df).toPandas().iloc[:, -1]))
d.columns = ['pclass', 'age', 'sibsp', 'parch', 'fare']
d.age.hist()
plt.xlabel('age')

In [None]:
new_titanic_df.toPandas().age.hist()
plt.xlabel('age')

In [None]:
# full model

In [None]:
gender_pipe = feature.StringIndexer(inputCol='sex', handleInvalid='skip')

In [None]:
titles_list = " Capt  Col  Don  Dona  Dr  Jonkheer  Lady  Major  Master  Miss  Mlle  Mme  Mr  Mrs  Ms  Rev  Sir".lower().split()

In [None]:
title_pipe = pipe(feature.RegexTokenizer(pattern="\\b(" + ("|".join(titles_list)) + ")\\b", 
                       gaps=False,
                      inputCol='name'), 
                  feature.CountVectorizer())

In [None]:
new_titanic_df.select('embarked').distinct().show()

In [None]:
embarked_pipe = pipe(feature.StringIndexer(inputCol='embarked', handleInvalid='skip'), feature.OneHotEncoder())

In [None]:
embarked_pipe.fit(new_titanic_df.select('embarked')).transform(new_titanic_df.select('embarked')).distinct().show()

In [None]:
cabin_pipe = Pipeline(stages=[feature.SQLTransformer(statement='select *, substring(cabin,1,1) as cabin_col from __THIS__'),
                              feature.StringIndexer(inputCol='cabin_col', outputCol='cabin_col2', handleInvalid='skip'),
                              feature.OneHotEncoder(inputCol='cabin_col2')])

In [None]:
numerical_features = pipe(feature.VectorAssembler(inputCols=['pclass', 'age', 'sibsp', 'parch']),
                          feature.StandardScaler())

In [None]:
all_features = pipe((numerical_features, feature.QuantileDiscretizer().setNumBuckets(4).setInputCol("fare").setOutputCol("result"), gender_pipe, title_pipe, embarked_pipe, cabin_pipe), feature.VectorAssembler())

In [None]:
lr = classification.LogisticRegression(labelCol='survived')

In [None]:
final_model_pipeline = pipe(all_features, lr)

In [None]:
paramGrid = ParamGridBuilder() \
    .addGrid(lr.elasticNetParam, [0., 0.01, 0.1]) \
    .addGrid(lr.regParam, [0.1, 0.01, 0.001, 0.0001]) \
    .build()

In [None]:
len(paramGrid)

In [None]:
evaluator = BinaryClassificationEvaluator(labelCol=lr.getLabelCol(), rawPredictionCol=lr.getRawPredictionCol())
crossval = CrossValidator(estimator=final_model_pipeline, 
                          estimatorParamMaps=paramGrid, 
                          evaluator=evaluator, 
                          numFolds=2)

In [None]:
final_model_fitted = crossval.fit(training)

In [None]:
test.show()

In [None]:
evaluator.evaluate(final_model_fitted.transform(test))

In [None]:
b = final_model_fitted.bestModel.stages[1]

In [None]:
b.getInputCol()

In [None]:
b.getSplits()

In [None]:
final_model_fitted.bestModel.stages[-1].coefficients

In [None]:
final_model_fitted.bestModel.stages[0].stages[0].getInputCols()

In [None]:
final_model_fitted.bestModel.stages[1]

In [None]:
final_model_fitted.bestModel.stages[1].getInputCol()

In [None]:
final_model_fitted.bestModel.stages[1].getInputCol()

In [None]:
si = final_model_fitted.bestModel.stages[2]

In [None]:
si.labels

In [None]:
final_model_fitted.bestModel.stages[3].stages[1].vocabulary

In [None]:
len(final_model_fitted.bestModel.stages[-1].coefficients)

In [None]:
final_model_fitted.bestModel.stages[-1]

In [None]:
final_model_fitted.bestModel.stages[-1].coefficients

In [None]:
lr_fit = final_model_fitted.bestModel.stages[-1]

In [None]:
lr_fit.summary.featuresCol

In [None]:
final_model_fitted.bestModel.stages[0].stages