# BANK MARKETING

<br><br>
Membros:
- Anderson Jesus
- Caio Viera
- Pedro Correia



> CRIAÇÃO DE MODELO PREDITIVO

#### Inicializando sessão do Spark

In [1]:
import findspark
findspark.init('/home/labdata/spark-2.2.1-bin-hadoop2.6')

In [2]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.appName('bank').getOrCreate()

#### Carregando os Dados

In [3]:
data = spark.read.csv(
    "hdfs://elephant:8020/user/labdata/bank-historical-data.csv",
    header=True,
    sep=",",
    inferSchema=True
)

In [4]:
data = data.selectExpr(*["`{}` as {}".format(col, col.replace('.', '_')) for col in data.columns])

In [5]:
data.head()

Row(age=35, job='admin.', marital='single', education='university.degree', default='no', housing='yes', loan='no', contact='cellular', month='may', day_of_week='tue', duration=255, campaign=2, pdays=999, previous=1, poutcome='failure', emp_var_rate=-1.8, cons_price_idx=92.89299999999999, cons_conf_idx=-46.2, euribor3m=1.291, nr_employed=5099.1, y='no', client_id='TRZK1')

#### Preparação dos Dados

Definindo variáveis utilizadas pelo tipo a fim de realizar o encoding necessário

In [6]:
categoricalColumns = [
    'job',
    'marital',
    'education',
    'default',
    'housing',
    'loan',
    'contact',
    'month',
    'day_of_week',
    'poutcome'
]

# não utilizaremos a variável `duration`, que algo não sabido antes da ligação ocorrer
# e portanto, não deve ser válida para fins preditivos
numericColumns = [
    'pdays',
    'previous',
    'emp_var_rate',
    'cons_price_idx',
    'cons_conf_idx',
    'euribor3m',
    'nr_employed'
]

Iniciando a construção do Pipeline

In [7]:
from pyspark.ml import Pipeline

In [8]:
# encoders e indexadores necessários ao tratamento dos dados
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

# instanciando nossa lista de passos a serem fornecidos ao pipeline
stages = []

Target

In [9]:
# indexação da variável resposta
indexer = StringIndexer(
    inputCol='y', 
    outputCol='label'
).setHandleInvalid("skip")

stages += [indexer]

Dados Categóricos

In [10]:
# transformações dados categóricos (paralelo pandas: get_dummies)
for categoricalCol in categoricalColumns:
    # nomes para valores [0:n_cats-1]
    indexer = StringIndexer(
        inputCol=categoricalCol, 
        outputCol=categoricalCol+'_index'
    ).setHandleInvalid("skip")
    # criando dummies
    encoder = OneHotEncoder(
        inputCol=categoricalCol+'_index',
        outputCol=categoricalCol+'_class_vec'
    )
    # inserindo estágios de transformação
    stages += [indexer, encoder]

Dados Numéricos

In [11]:
# transformando variáveis numéricas para o tipo double
for numericCol in numericColumns:
    data = data.withColumn(numericCol, data[numericCol].cast('double'))

Assembler

In [12]:
# criando assembler, que deixa os dados no formato vetorial 
# demandado pela biblioteca ML do Spark

assembler_inputs = [categoricalCol+'_class_vec' for categoricalCol in categoricalColumns]
assembler_inputs += numericColumns
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

stages += [assembler]

#### Modelagem

*Gradient Boosting Machine*

In [13]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(
    labelCol="label",
    featuresCol="features",
    predictionCol='prediction',
    maxIter=60,
    stepSize=0.01,
    seed=420
)

stages += [gbt]

In [14]:
pipeline = Pipeline(stages=stages)
pipeline.write().overwrite().save('hdfs://elephant:8020/user/labdata/model2/bank-pipeline-model-unfit')

#### Treinando e Validando Modelo

In [15]:
# separação de dados em treino e teste
(trainingData, testData) = data.randomSplit([0.8, 0.2], seed=420)

print('Observações para treino: {:>7}'.format(trainingData.count()))
print('Observações para teste:  {:>7}'.format(testData.count()))

Observações para treino:   26389
Observações para teste:     6561


In [16]:
%%time
pipelineModel = pipeline.fit(trainingData)
pipelineModel.write().overwrite().save('hdfs://elephant:8020/user/labdata/model2/bank-pipeline-model')

CPU times: user 85.7 ms, sys: 20.4 ms, total: 106 ms
Wall time: 1min 4s


In [17]:
predictions_gbt = pipelineModel.transform(testData)

In [18]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

evaluator_accuracy = MulticlassClassificationEvaluator(
    labelCol="label", 
    predictionCol="prediction",
    metricName="accuracy"
)

evaluator_auc = BinaryClassificationEvaluator(
    labelCol="label", 
    rawPredictionCol="rawPrediction"
)

accuracy_gbt = evaluator_accuracy.evaluate(predictions_gbt)
print('accuracy:         {:.4f}'.format(accuracy_gbt))
auc_gbt = evaluator_auc.evaluate(predictions_gbt)
print(f'areaUnderROC:     {auc_gbt:.4f}')

accuracy:         0.9032
areaUnderROC:     0.7881


In [19]:
predictions_gbt.select('label', 'prediction').createOrReplaceTempView('predictions')

spark.sql("""
SELECT
    round((tp+tn)/(tp+tn+fp+fn), 4) as accuracy,
    round(tp/(tp+fp), 4) as precision,
    round(tp/(tp+fn), 4) as recall
FROM (
    SELECT
        sum(tn) as tn,
        sum(tp) as tp,
        sum(fn) as fn,
        sum(fp) as fp
    FROM (
        SELECT
            case when label = 0 and prediction = 0 then 1 else 0 end as tn,
            case when label = 1 and prediction = 1 then 1 else 0 end as tp,
            case when label = 1 and prediction = 0 then 1 else 0 end as fn,
            case when label = 0 and prediction = 1 then 1 else 0 end as fp
        FROM
            predictions
    )
)
""").show()

+--------+---------+------+
|accuracy|precision|recall|
+--------+---------+------+
|  0.9032|   0.6656|0.2805|
+--------+---------+------+



`GBTClassifier` apresentou maior potencial, apesar de ter um recall ainda muito baixo. 

Na sequência, vamos buscar realizar um resampling da base, buscando deixar a classe positiva mais prevalente.

In [20]:
data_res = data.sampleBy('y', fractions={'yes': 1, 'no': 0.2})
data_res.groupBy('y').count().show()

+---+-----+
|  y|count|
+---+-----+
| no| 5884|
|yes| 3713|
+---+-----+



In [21]:
(trainingData, testData) = data_res.randomSplit([0.8, 0.2], seed=420)

print('Observações para treino: {:>7}'.format(trainingData.count()))
print('Observações para teste:  {:>7}'.format(testData.count()))

Observações para treino:    7684
Observações para teste:     1913


In [22]:
%%time
# criando novo modelo com resampling
pipelineModel_res = pipeline.fit(trainingData)

# salvando o modelo
pipelineModel_res.write().overwrite().save('hdfs://elephant:8020/user/labdata/model2/bank-pipeline-model-res')

CPU times: user 85.8 ms, sys: 20.3 ms, total: 106 ms
Wall time: 40.8 s


In [23]:
predictions_res = pipelineModel_res.transform(testData)

In [24]:
predictions_res.select('client_id', 'label', 'probability', 'prediction').show(10)

+---------+-----+--------------------+----------+
|client_id|label|         probability|prediction|
+---------+-----+--------------------+----------+
|    99E3E|  1.0|[0.26822972896096...|       1.0|
|    LYE2O|  1.0|[0.10345855935561...|       1.0|
|    HWDG5|  0.0|[0.23828302553751...|       1.0|
|    458Y0|  1.0|[0.09527461398786...|       1.0|
|    ZMB0S|  0.0|[0.21945796379139...|       1.0|
|    KW4K4|  1.0|[0.47716439150218...|       1.0|
|    G5PES|  0.0|[0.47716439150218...|       1.0|
|    QVAU3|  1.0|[0.55234617784506...|       0.0|
|    EC5H0|  1.0|[0.47716439150218...|       1.0|
|    PMII3|  1.0|[0.43866316142809...|       1.0|
+---------+-----+--------------------+----------+
only showing top 10 rows



In [25]:
accuracy_res = evaluator_accuracy.evaluate(predictions_res)
print('Accuracy:         {:.4f}'.format(accuracy_res))
auc_gbt = evaluator_auc.evaluate(predictions_gbt)
print(f'areaUnderROC:     {auc_gbt:.4f}')

Accuracy:         0.7799
areaUnderROC:     0.7881


In [26]:
predictions_res.select('label', 'prediction').createOrReplaceTempView('predictions_res')

spark.sql("""
SELECT
    round((tp+tn)/(tp+tn+fp+fn), 4) as accuracy,
    round(tp/(tp+fp), 4) as precision,
    round(tp/(tp+fn), 4) as recall
FROM (
    SELECT
        sum(tn) as tn,
        sum(tp) as tp,
        sum(fn) as fn,
        sum(fp) as fp
    FROM (
        SELECT
            case when label = 0 and prediction = 0 then 1 else 0 end as tn,
            case when label = 1 and prediction = 1 then 1 else 0 end as tp,
            case when label = 1 and prediction = 0 then 1 else 0 end as fn,
            case when label = 0 and prediction = 1 then 1 else 0 end as fp
        FROM
            predictions_res
    )
)
""").show()

+--------+---------+------+
|accuracy|precision|recall|
+--------+---------+------+
|  0.7799|   0.7856|0.6074|
+--------+---------+------+



Como esperávamos, o recall apresentou um valor bem mais interessante, demonstrando o potencial da solução, que certamente poderia ser refinada ainda mais a fim de atingir resultados mais expressivos.

<hr>

In [31]:
predictions_res.select('probability', 'prediction', 'label').collect()

[Row(probability=DenseVector([0.2682, 0.7318]), prediction=1.0, label=1.0),
 Row(probability=DenseVector([0.1035, 0.8965]), prediction=1.0, label=1.0),
 Row(probability=DenseVector([0.2383, 0.7617]), prediction=1.0, label=0.0),
 Row(probability=DenseVector([0.0953, 0.9047]), prediction=1.0, label=1.0),
 Row(probability=DenseVector([0.2195, 0.7805]), prediction=1.0, label=0.0),
 Row(probability=DenseVector([0.4772, 0.5228]), prediction=1.0, label=1.0),
 Row(probability=DenseVector([0.4772, 0.5228]), prediction=1.0, label=0.0),
 Row(probability=DenseVector([0.5523, 0.4477]), prediction=0.0, label=1.0),
 Row(probability=DenseVector([0.4772, 0.5228]), prediction=1.0, label=1.0),
 Row(probability=DenseVector([0.4387, 0.5613]), prediction=1.0, label=1.0),
 Row(probability=DenseVector([0.098, 0.902]), prediction=1.0, label=1.0),
 Row(probability=DenseVector([0.1939, 0.8061]), prediction=1.0, label=1.0),
 Row(probability=DenseVector([0.4537, 0.5463]), prediction=1.0, label=1.0),
 Row(probabili

In [35]:
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType

positiveProbability = F.udf(lambda v: float(v[1]), FloatType())
predictions_res.select('prediction', 'label', positiveProbability('probability').alias('score')).show()

+----------+-----+----------+
|prediction|label|     score|
+----------+-----+----------+
|       1.0|  1.0| 0.7317703|
|       1.0|  1.0| 0.8965414|
|       1.0|  0.0|0.76171696|
|       1.0|  1.0| 0.9047254|
|       1.0|  0.0|  0.780542|
|       1.0|  1.0| 0.5228356|
|       1.0|  0.0| 0.5228356|
|       0.0|  1.0|0.44765383|
|       1.0|  1.0| 0.5228356|
|       1.0|  1.0| 0.5613368|
|       1.0|  1.0| 0.9020499|
|       1.0|  1.0|0.80613846|
|       1.0|  1.0| 0.5462914|
|       0.0|  1.0| 0.4138977|
|       0.0|  0.0|0.37635377|
|       1.0|  1.0| 0.7223718|
|       0.0|  0.0|0.26796854|
|       0.0|  0.0|0.24748759|
|       0.0|  0.0|0.27659518|
|       1.0|  1.0| 0.5169685|
+----------+-----+----------+
only showing top 20 rows

