# SPARK hackfest-in-a-box for TELCO - LAB 2

The objetive is this lab is to perform model training with Spark MLLib Random Forest Classification model for the customer churn. Random forests are ensembles of decision trees. Random forests are one of the most successful machine learning models for classification and regression. They combine many decision trees in order to reduce the risk of overfitting. Like decision trees, random forests handle categorical features, extend to the multiclass classification setting, do not require feature scaling, and are able to capture non-linearities and feature interactions.

`spark.mllib` supports random forests for binary and multiclass classification and for regression, using both continuous and categorical features. `spark.mllib` implements random forests using the existing decision tree implementation. Please see the decision tree guide for more information on `trees.prediction`

## 1 Initial data loading

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F
import pandas as pd
import sys, logging, argparse, random, tempfile, json
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StructType, DoubleType, StringType
from pyspark.sql.functions import lit

In [None]:
# Pre-defined input variables
print('....Setting input variables')
projectNbr = "YOUR_PROJECT_NBR"
projectID = "YOUR_PROJECT_ID"
appBaseName = "customer-churn-model"
appNameSuffix = "training"
appName = f"{appBaseName}-{appNameSuffix}"
modelBaseNm = appBaseName
bqDatasetNm = f"{projectID}.customer_churn_ds"
operation = appNameSuffix
bigQuerySourceTableFQN = f"{bqDatasetNm}.training_data_notebook"

In [None]:
# SPARK session creation
print('....Initializing spark & spark configs')
spark = SparkSession.builder.appName(appName).getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

In [None]:
# Read training data and typecasting
print('....Read the training dataset into a dataframe')
inputDF = spark.read \
    .format('bigquery') \
    .load(bigQuerySourceTableFQN)
# Typecast some columns to the right datatype
inputDF = inputDF.withColumn("monthly_charges", inputDF.monthly_charges.cast('float')) \
    .withColumn("total_charges", inputDF.total_charges.cast('float'))

#### Q - Split the data in training and test datasets
* Use `randomSplit` to split the data into the `trainDF`, `testDF` dataframes

In [None]:
#INSERT CODE
SPLIT_SEED = 6
SPLIT_SPECS = [0.8, 0.2]
print('....Splitting the dataset')
trainDF, testDF = inputDF.randomSplit(SPLIT_SPECS, seed=SPLIT_SEED)

## 2 Feature engineering

Feature engineering or feature extraction or feature discovery is the process of using domain knowledge to extract features (characteristics, properties, attributes) from raw data. The motivation is to use these extra features to improve the quality of results from a machine learning process, compared with supplying only the raw data to the machine learning process.

#### Q - One hot encode the categorical columns
* One hot encode  each text categorical column that you have identified in the previous lab using first `stringIndexer` and then `OneHotEncoderEstimator`
* Do not fit the transformations yet, instead maintain a list with all the required steps

In [None]:
#INSERT CODE
print('....Data pre-procesing: One hot encoding of categotical columns')
CATEGORICAL_COLUMN_LIST = ['gender', 'senior_citizen', 'partner', 'dependents', 'phone_service', 'multiple_lines',
                        'internet_service', 'online_security', 'online_backup', 'device_protection', 'tech_support',
                        'streaming_tv', 'streaming_movies', 'contract', 'paperless_billing', 'payment_method']  
dataPreprocessingStagesList = []
for eachCategoricalColumn in CATEGORICAL_COLUMN_LIST:
    stringIndexer = StringIndexer(inputCol=eachCategoricalColumn, outputCol=eachCategoricalColumn + "Index")
    # https://spark.apache.org/docs/3.0.0-preview/ml-migration-guide.html 
    if (spark.version).startswith("2."):
        from pyspark.ml.feature import OneHotEncoderEstimator
        encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[eachCategoricalColumn + "classVec"])
    elif (spark.version).startswith("3."):
        from pyspark.ml.feature import OneHotEncoder
        encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[eachCategoricalColumn + "classVec"])
    else:
        from pyspark.ml.feature import OneHotEncoder
        encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[eachCategoricalColumn + "classVec"])
    dataPreprocessingStagesList += [stringIndexer, encoder]

#### Q - Generate labels for the output variable (churn)
* Use `stringIndexer` and generate labels for the churn column
* Do not fit the transformations yet, instead append the transformation to the previous list

In [None]:
#INSERT CODE
print('....Data pre-procesing: Labels for target columns')
labelStringIndexer = StringIndexer(inputCol="churn", outputCol="label")
dataPreprocessingStagesList += [labelStringIndexer]

#### Q - Assemble all the features into the `features` columns
* Use `VectorAssembler` and a `features` column with the one hot encoded features + previous numerical features
* Do not fit the transformations yet, instead append the transformation to the previous list

In [None]:
#INSERT CODE
NUMERIC_COLUMN_LIST = ['monthly_charges', 'total_charges']
print('....Generating features column')
assemblerInputs = NUMERIC_COLUMN_LIST + [c + "classVec" for c in CATEGORICAL_COLUMN_LIST]
featuresVectorAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
dataPreprocessingStagesList += [featuresVectorAssembler]

## 3 Model training and testing

#### Q - Generate a Random Forest Classifier training step and the training pipeline
* Use `RandomForestClassifier` to define the training step
* Do not fit the transformations yet, instead append the transformation to a new list
* Generate a training pipeline using `Pipeline` putting together the feature engineering steps and the training step

In [None]:
#INSERT CODE
print('....Model training')
modelTrainingStageList = []
rfClassifier = RandomForestClassifier(labelCol="label", featuresCol="features")
modelTrainingStageList += [rfClassifier]
print('....Generating the pipeline')
pipeline = Pipeline(stages=dataPreprocessingStagesList + modelTrainingStageList) 

#### Q - Fit the training pipeline
* Use `fit` to execute the training pipeline over the training dataset

In [None]:
#INSERT CODE
print('....Fit the model')
pipelineModel = pipeline.fit(trainDF)

#### Q - Test the model with the test dataset
* Use `transform` to execute predidctions using the fitted model over the `testDF`

In [None]:
#INSERT CODE
print('....Test the model')
predictionsDF = pipelineModel.transform(testDF)
predictionsDF.show(1, vertical=True)

## 4 Model evaluation

#### Q - Calculate the area under ROC to asses the quality of the classifier
* Use `BinaryClassificationEvaluator` to calculathe the `areaUnderROC` metric
* What do you think about the value?

In [None]:
#INSERT CODE
print('....Calculating area under the ROC curve')
evaluator = BinaryClassificationEvaluator()
evaluator.setRawPredictionCol("prediction")
evaluator.setLabelCol("label")
value = evaluator.evaluate(predictionsDF, {evaluator.metricName: "areaUnderROC"})

In [None]:
metricsDF = spark.createDataFrame( [("areaUnderROC",value)], ["metric_nm", "metric_value"]) 

In [None]:
metricsDF.show()

**End of LAB 2**