# Machine learning model

In [0]:

%run ./configuration


In [0]:
titanic_df = spark.read.format("delta").load(gold_path+'gold_data')

## Train/Test Split

Let's use the same 80/20 split

In [0]:
trainDF, testDF = titanic_df.randomSplit([.8, .2], seed=42)

## Categorical Variables

There are a few ways to handle categorical features:
* Assign them a numeric value
* Create "dummy" variables (also known as One Hot Encoding)
* Generate embeddings (mainly used for textual data)

### One Hot Encoder
Here, we are going to One Hot Encode (OHE) our categorical variables. Spark doesn't have a `dummies` function, and OHE is a two step process. First, we need to use [StringIndexer](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.StringIndexer.html?highlight=stringindexer#pyspark.ml.feature.StringIndexer) to map a string column of labels to an ML column of label indices.

Then, we can apply the [OneHotEncoder](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.OneHotEncoder.html?highlight=onehotencoder#pyspark.ml.feature.OneHotEncoder) to the output of the StringIndexer.

In [0]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

categoricalCols = [field for (field, dataType) in trainDF.dtypes if dataType == "string"]
indexOutputCols = [x + "Index" for x in categoricalCols]
oheOutputCols = [x + "OHE" for x in categoricalCols]

stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=indexOutputCols, handleInvalid="skip")
oheEncoder = OneHotEncoder(inputCols=indexOutputCols, outputCols=oheOutputCols)

## Vector Assembler

Now we can combine our OHE categorical features with our numeric features.

In [0]:
from pyspark.ml.feature import VectorAssembler

numericCols = [field for (field, dataType) in trainDF.dtypes if ((dataType == "double") & (field != "price"))]
assemblerInputs = oheOutputCols + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

## Logistic Regression

Now that we have all of our features, let's build a linear regression model.

In [0]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="Survived", featuresCol="features")
#Training algo

## Pipeline

Let's put all these stages in a Pipeline. A [Pipeline](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.Pipeline.html?highlight=pipeline#pyspark.ml.Pipeline) is a way of organizing all of our transformers and estimators.

This way, we don't have to worry about remembering the same ordering of transformations to apply to our test dataset.

In [0]:
from pyspark.ml import Pipeline

stages = [stringIndexer, oheEncoder, vecAssembler, lr]
pipeline = Pipeline(stages=stages)

pipelineModel = pipeline.fit(trainDF)

## Saving Models

We can save our models to persistent storage (e.g. DBFS) in case our cluster goes down so we don't have to recompute our results.

In [0]:

pipelineModel.write().overwrite().save(pipelinePath)

## Loading models

When you load in models, you need to know the type of model you are loading back in (was it a linear regression or logistic regression model?).

For this reason, we recommend you always put your transformers/estimators into a Pipeline, so you can always load the generic PipelineModel back in.

In [0]:
from pyspark.ml import PipelineModel

savedPipelineModel = PipelineModel.load(pipelinePath)

## Apply model to test set

In [0]:
predDF = savedPipelineModel.transform(testDF)

display(predDF.select("features", "Survived", "prediction"))

features,Survived,prediction
"Map(vectorType -> dense, length -> 7, values -> List(0.0, 1.0, 0.0, 0.0, 1.0, 50.0, 28.7125))",0,1.0
"Map(vectorType -> dense, length -> 7, values -> List(0.0, 1.0, 1.0, 1.0, 0.0, 21.0, 77.2875))",0,1.0
"Map(vectorType -> dense, length -> 7, values -> List(0.0, 1.0, 1.0, 0.0, 1.0, 24.0, 79.2))",0,1.0
"Map(vectorType -> dense, length -> 7, values -> List(0.0, 1.0, 1.0, 1.0, 0.0, 29.0, 30.0))",0,0.0
"Map(vectorType -> dense, length -> 7, values -> List(0.0, 1.0, 1.0, 0.0, 1.0, 36.0, 40.125))",0,1.0
"Map(vectorType -> dense, length -> 7, values -> List(0.0, 1.0, 1.0, 1.0, 0.0, 37.0, 25.925))",0,0.0
"Map(vectorType -> dense, length -> 7, values -> List(0.0, 1.0, 1.0, 0.0, 1.0, 37.0, 30.6958))",0,1.0
"Map(vectorType -> dense, length -> 7, values -> List(0.0, 1.0, 1.0, 1.0, 0.0, 37.0, 52.0))",0,0.0
"Map(vectorType -> dense, length -> 7, values -> List(0.0, 1.0, 1.0, 0.0, 0.0, 44.0, 90.0))",0,0.0
"Map(vectorType -> dense, length -> 7, values -> List(0.0, 1.0, 1.0, 1.0, 0.0, 45.0, 26.55))",0,0.0


## Evaluate model

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

regressionEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Survived", metricName="rmse")

rmse = regressionEvaluator.evaluate(predDF)
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
print(f"RMSE is {rmse}")
print(f"R2 is {r2}")

RMSE is 0.43943537440204117
R2 is 0.2076502732240436
