# Spark classification model
Here we will use the [Customer Sample Dataset]('./data') to train a very simple **Classification Model** using MLFlow(https://mlflow.org/) to fit a native spark model.

This model will have three stages: 
* [StringIndexer](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.StringIndexer.html) to map "UF" category field to a label index
* [VectorAssembler](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html) to merge all the columns used to train the model in a vector column 
* [LogisticRegression Model](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.LogisticRegressionModel.html) to fit a Logistic Regression model to predict the Churn

Read the Customer sample Dataset

In [None]:
Customer = spark.read.parquet('/dbfs/Dataset/Customer')

Select the columns to be used in the training

In [None]:
numeric_columns = ['Idade',
 'RendaMensal',
 'PercentualUtilizacaoLimite',
 'QtdTransacoesNegadas',
 'AnosDeRelacionamentoBanco',
 'JaUsouChequeEspecial',
 'QtdEmprestimos',
 'NumeroAtendimentos',
 'TMA',
 'IndiceSatisfacao',
 'Saldo',
 'CLTV'
]

columns = list(numeric_columns)
columns.append('UF')
columns.append('Churn')

Create the train dataset

In [None]:
dataset = Customer.select(columns)
display(dataset)

## Train Model

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression

indexer = StringIndexer(inputCol='UF', outputCol="UF_Index")

assembler_columns = list(numeric_columns)
assembler_columns.append('UF_Index')
assembler = VectorAssembler(inputCols=numeric_columns, outputCol="features")

lr = LogisticRegression(maxIter=10, regParam=0.01, labelCol='Churn')

pipeline = Pipeline(stages=[indexer, assembler, lr])
model = pipeline.fit(dataset)

## Log the Model using MLFLow
Finally we can track the model using [MLFlow](https://mlflow.org/) Platform. Feel free to extend the capabilities (e.g using an experiment).
With the fitted model we can use this run to deploy it to a inference endpoint.

In [None]:
import mlflow
import mlflow.spark

with mlflow.start_run():
    print(f'Your run_id:{mlflow.active_run().info.run_id}. Please use it to load and deploy your model')
    mlflow.spark.log_model(spark_model=model, artifact_path='model') 

### Predict
Send a payload to test the model. We don't need to care about the transformations because the data will follow all the stages we have defined in the pipeline

In [None]:
# Load the model using PyFunc
loaded_model = mlflow.pyfunc.load_model(f'runs:/a2e0a0ce09cc47ed9ee1bcde7874fb83/model')

import json
import pandas as pd

data = '{ \
    "Idade": [22], \
      "RendaMensal": [1000], \
      "PercentualUtilizacaoLimite": [1], \
      "QtdTransacoesNegadas": [100], \
      "AnosDeRelacionamentoBanco": [12], \
      "JaUsouChequeEspecial": [0], \
      "QtdEmprestimos": [1], \
      "NumeroAtendimentos": [100], \
      "TMA": [300], \
      "IndiceSatisfacao": [1], \
      "Saldo": [6438], \
      "CLTV": [71], \
      "UF": ["SP"] \
 } '

data = pd.DataFrame(json.loads(data))

predictions = loaded_model.predict(data)
predictions