In [None]:
displayHTML('<div style="text-align:center"><img src ="https://github.com/romulomadu/PEDS/blob/master/algebra/tarefas/logos.png?raw=true" /></div>')

# Churn Customer Analysis

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# Leitura dos Dados

Os dados foram ingeridos no HDFS com o nome `Churn_Modelling.csv` e então são lidos pelo Spark no formato de `DataFrame`.

In [None]:
# File location and type
file_location = "/FileStore/tables/Churn_Modelling.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df_ = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

#df_.printSchema()

In [None]:
numeric_features = [t[0] for t in df_.dtypes if t[1] == 'int']
numeric_data = df_.select(numeric_features).toPandas()
n = len(numeric_data.columns)
df = df_.select(
 'CreditScore',
 'Geography',
 'Gender',
 'Age',
 'Tenure',
 'Balance',
 'NumOfProducts',
 'HasCrCard',
 'IsActiveMember',
 'EstimatedSalary',
 'Exited')
cols = df.columns

# Preparação dos Dados

Uma vez que os dados estão carregados no Spark, vamos utilizar seu poder de processamento distribuído para realizar operações no `DataFrame` para preparar os dados para a modelagem do problema.

### Pipeline

O *Spark* permite utilizar o método `Pipeline` para encadear operações de transformação dos dados, para isso faz uso da estrutura de gráfico direcional acíclio (DAG), desta forma foi possível escalonar as seguintes transformações nos dados:

* Conversão das variáveis categóricas (*Geography*) para variáveis dummy, utilizando o método `OneHotEncoder`
* Formatação dos dados criando as colunas `features` e `label` para adequar o formato dos dados para usar ocmo entrada no *SparkML*

A vantagem de se utilizar o `Pipeline` se deve a capacidade de reaproveitamento das mesmas transformações para novos registros.

In [None]:
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
categoricalColumns = ['Geography', 'Gender', 'HasCrCard', 'IsActiveMember']#, 'Exited']
stages = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
label_stringIdx = StringIndexer(inputCol = 'Exited', outputCol = 'label')
stages += [label_stringIdx]
numericCols = ['CreditScore',
 'Age',
 'Tenure',
 'Balance',
 'NumOfProducts',
 'EstimatedSalary']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [None]:
cols = ['CreditScore',
 'Geography',
 'Gender',
 'Age',
 'Tenure',
 'Balance',
 'NumOfProducts',
 'HasCrCard',
 'IsActiveMember',
 'EstimatedSalary']

In [None]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
selectedCols = ['label', 'features'] + cols
df = df.select(selectedCols)

# Modelo de Classificação

### Holdout

Uma boa prática na modelagem de problemas de aprendizado de máquina é separar o conjunto de dados em dados de treino, que serão utlizados pelo algoritmo para aprender, e dados de teste, que serão usados para avaliar a performance do modelo. Essa técnica é denominada *Holdout* e pode-se implementá-la no *Spark* utilizano o método `randomSplit` do  `DataFrame`

In [None]:
train, test = df.randomSplit([0.7, 0.3], seed = 2018)

### Regressão Logística

A regressão logística é uma técnica de aprendizado supervisionado para classificação que consiste na minimização na seguinte função de custo

$$J(w) = \frac{1}{m}\[ \sum \limits_{x=1}^{m} y^{(i)} \log h_w (x^{(i)}) + (1-y^{(i)}) \log(1-h_w (x^{(i)})) \]$$

In [None]:

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(train)

In [None]:
import matplotlib.pyplot as plt
plt.clf()
import numpy as np
beta = np.sort(lrModel.coefficients)
plt.plot(beta)
plt.ylabel('Beta Coefficients')
#display(plt.show())

In [None]:
#display(train.collect())

In [None]:
plt.clf()
trainingSummary = lrModel.summary
roc = trainingSummary.roc.toPandas()
plt.plot(roc['FPR'],roc['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
#display(plt.show())
print('Training set areaUnderROC: ' + str(trainingSummary.areaUnderROC))

Avaliação do modelo

In [None]:
plt.clf()
pr = trainingSummary.pr.toPandas()
plt.plot(pr['recall'],pr['precision'])
plt.ylabel('Precision')
plt.xlabel('Recall')
#display(plt.show())

In [None]:
predictions = lrModel.transform(test)
#predictions.select('Age', 'rawPrediction', 'prediction', 'probability').show(10)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(predictions))

In [None]:
result = predictions.toPandas()
y_pred = result.prediction
y_test = result.label
from sklearn.metrics import roc_auc_score, recall_score, classification_report, confusion_matrix, accuracy_score
print(classification_report(y_test, y_pred))
print(accuracy_score(y_test, y_pred))

### Árvores de Decisão

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3)
dtModel = dt.fit(train)
predictions = dtModel.transform(test)
#predictions.select('age', 'job', 'label', 'rawPrediction', 'prediction', 'probability').show(10)

In [None]:
result = predictions.toPandas()
y_pred = result.prediction
y_test = result.label
from sklearn.metrics import roc_auc_score, recall_score, classification_report, confusion_matrix, accuracy_score
print(classification_report(y_test, y_pred))
print(accuracy_score(y_test, y_pred))

In [None]:
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

### Random Forest

In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(train)
predictions = rfModel.transform(test)
#predictions.select('age', 'job', 'label', 'rawPrediction', 'prediction', 'probability').show(10)
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

In [None]:
result = predictions.toPandas()
y_pred = result.prediction
y_test = result.label
from sklearn.metrics import roc_auc_score, recall_score, classification_report, confusion_matrix, accuracy_score
print(classification_report(y_test, y_pred))
print(accuracy_score(y_test, y_pred))

### Gradient Boosting

In [None]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(maxIter=10)
gbtModel = gbt.fit(train)
predictions = gbtModel.transform(test)
#predictions.select('age', 'job', 'label', 'rawPrediction', 'prediction', 'probability').show(10)
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

In [None]:
result = predictions.toPandas()
y_pred = result.prediction
y_test = result.label
from sklearn.metrics import roc_auc_score, recall_score, classification_report, confusion_matrix, accuracy_score
print(classification_report(y_test, y_pred))
print(accuracy_score(y_test, y_pred))

In [None]:
dbutils.widgets.text(name='CustId_input', defaultValue='15634602', label='CustId')
custid_input= int(getArgument("CustId_input"))
plt.clf()
input_df = df_.filter('CustomerId == {}'.format(custid_input))
input_df = input_df.drop('CustomerId', 'RowNumber', 'Surname')
input_df = pipelineModel.transform(input_df)
#selectedCols = ['label', 'features'] + cols
#input_df2 = input_df.select(selectedCols)
pred_input = rfModel.transform(input_df)
prob_exit_input = round((pred_input.select('probability').toPandas()['probability'].loc[0][1])*100,2)
strategy = int(prob_exit_input/20) + 1
if strategy == 1:
  message = 'Não oferecer taxas diferenciadas'
elif strategy > 1:
  message = 'Ofereça taxas diferenciadas de nível {}'.format(strategy)

displayHTML("""<center><h1>O cliente tem {}% de chances de sair do Banco</h1>
            <h3>{}</h3>
            </center>
            """.format(prob_exit_input, message))