# BANK MARKETING

<br><br>
Membros:
- Anderson Jesus
- Caio Viera
- Pedro Correia



> CRIAÇÃO DE MODELO PREDITIVO

#### Inicializando sessão do Spark

In [1]:
import findspark
findspark.init('/home/labdata/spark-2.2.1-bin-hadoop2.6')

In [2]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.appName('bank').getOrCreate()

#### Carregando os Dados

In [3]:
data = spark.read.csv(
    "hdfs://elephant:8020/user/labdata/bank.csv",
    header=True,
    sep=";",
    inferSchema=True
)

In [4]:
data = data.selectExpr(*["`{}` as {}".format(col, col.replace('.', '_')) for col in data.columns])

#### Preparação dos Dados

Definindo variáveis utilizadas pelo tipo a fim de realizar o encoding necessário

In [5]:
categoricalColumns = [
    'job',
    'marital',
    'education',
    'default',
    'housing',
    'loan',
    'contact',
    'month',
    'day_of_week',
    'poutcome'
]

# não utilizaremos a variável `duration`, que algo não sabido antes da ligação ocorrer
# e portanto, não deve ser válida para fins preditivos
numericColumns = [
    'pdays',
    'previous',
    'emp_var_rate',
    'cons_price_idx',
    'cons_conf_idx',
    'euribor3m',
    'nr_employed'
]

Iniciando a construção do Pipeline

In [6]:
from pyspark.ml import Pipeline

In [7]:
# encoders e indexadores necessários ao tratamento dos dados
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

# instanciando nossa lista de passos a serem fornecidos ao pipeline
stages = []

Target

In [8]:
# indexação da variável resposta
indexer = StringIndexer(
    inputCol='y', 
    outputCol='label'
)

stages += [indexer]

Dados Categóricos

In [9]:
# transformações dados categóricos (paralelo pandas: get_dummies)
for categoricalCol in categoricalColumns:
    # nomes para valores [0:n_cats-1]
    indexer = StringIndexer(
        inputCol=categoricalCol, 
        outputCol=categoricalCol+'_index'
    )
    # criando dummies
    encoder = OneHotEncoder(
        inputCol=categoricalCol+'_index',
        outputCol=categoricalCol+'_class_vec'
    )
    # inserindo estágios de transformação
    stages += [indexer, encoder]

Dados Numéricos

In [10]:
# transformando variáveis numéricas para o tipo double
for numericCol in numericColumns:
    data = data.withColumn(numericCol, data[numericCol].cast('double'))

Assembler

In [11]:
# criando assembler, que deixa os dados no formato vetorial 
# demandado pela biblioteca ML do Spark

assembler_inputs = [categoricalCol+'_class_vec' for categoricalCol in categoricalColumns]
assembler_inputs += numericColumns
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

stages += [assembler]

#### Modelagem

*Gradient Boosting Machine*

In [12]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(
    labelCol="label",
    featuresCol="features",
    predictionCol='prediction',
    maxIter=60,
    stepSize=0.01,
    seed=420
)

stages += [gbt]

In [13]:
pipeline = Pipeline(stages=stages)
pipeline.write().overwrite().save('hdfs://elephant:8020/user/labdata/model/bank-pipeline-model-unfit')

#### Treinando e Validando Modelo

In [14]:
# separação de dados em treino e teste
(trainingData, testData) = data.randomSplit([0.8, 0.2], seed=420)

print('Observações para treino: {}'.format(trainingData.count()))
print('Observações para teste:  {}'.format(testData.count()))

Observações para treino: 32988
Observações para teste:  8200


In [15]:
%%time
pipelineModel = pipeline.fit(trainingData)
pipelineModel.write().overwrite().save('hdfs://elephant:8020/user/labdata/model/bank-pipeline-model')

CPU times: user 62.4 ms, sys: 14.8 ms, total: 77.2 ms
Wall time: 47.9 s


In [16]:
predictions_gbt = pipelineModel.transform(testData)

In [17]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

evaluator_accuracy = MulticlassClassificationEvaluator(
    labelCol="label", 
    predictionCol="prediction",
    metricName="accuracy"
)

evaluator_auc = BinaryClassificationEvaluator(
    labelCol="label", 
    rawPredictionCol="rawPrediction"
)

accuracy_gbt = evaluator_accuracy.evaluate(predictions_gbt)
print('Accuracy:         {:.4f}'.format(accuracy_gbt))
auc_gbt = evaluator_auc.evaluate(predictions_gbt)
print(f'areaUnderROC:     {auc_gbt:.4f}')

Accuracy:         0.8998
areaUnderROC:     0.7939


In [18]:
predictions_gbt.select('label', 'prediction').createOrReplaceTempView('predictions')

spark.sql("""
SELECT
    round((tp+tn)/(tp+tn+fp+fn), 4) as accuracy,
    round(tp/(tp+fp), 4) as precision,
    round(tp/(tp+fn), 4) as recall
FROM (
    SELECT
        sum(tn) as tn,
        sum(tp) as tp,
        sum(fn) as fn,
        sum(fp) as fp
    FROM (
        SELECT
            case when label = 0 and prediction = 0 then 1 else 0 end as tn,
            case when label = 1 and prediction = 1 then 1 else 0 end as tp,
            case when label = 1 and prediction = 0 then 1 else 0 end as fn,
            case when label = 0 and prediction = 1 then 1 else 0 end as fp
        FROM
            predictions
    )
)
""").show()

+--------+---------+------+
|accuracy|precision|recall|
+--------+---------+------+
|  0.8998|   0.6195|0.2634|
+--------+---------+------+



`GBTClassifier` apresentou maior potencial, apesar de ter um recall ainda muito baixo. 

Na sequência, vamos buscar realizar um resampling da base, buscando deixar a classe positiva mais prevalente.

In [24]:
data_res = data.sampleBy('y', fractions={'yes': 1, 'no': 0.2})
data_res.groupBy('y').count().show()

+---+-----+
|  y|count|
+---+-----+
| no| 7356|
|yes| 4640|
+---+-----+



In [25]:
(trainingData, testData) = data_res.randomSplit([0.8, 0.2], seed=420)

In [26]:
%%time
# criando novo modelo com resampling
pipelineModel_res = pipeline.fit(trainingData)

# salvando o modelo
pipelineModel_res.write().overwrite().save('hdfs://elephant:8020/user/labdata/model/bank-pipeline-model-res')

CPU times: user 55.7 ms, sys: 16.7 ms, total: 72.4 ms
Wall time: 30.2 s


In [27]:
predictions_res = pipelineModel_res.transform(testData)

In [28]:
accuracy_res = evaluator_accuracy.evaluate(predictions_res)
print('Accuracy:         {:.4f}'.format(accuracy_res))

Accuracy:         0.7782


In [29]:
predictions_res.select('label', 'prediction').createOrReplaceTempView('predictions_res')

spark.sql("""
SELECT
    round((tp+tn)/(tp+tn+fp+fn), 4) as accuracy,
    round(tp/(tp+fp), 4) as precision,
    round(tp/(tp+fn), 4) as recall
FROM (
    SELECT
        sum(tn) as tn,
        sum(tp) as tp,
        sum(fn) as fn,
        sum(fp) as fp
    FROM (
        SELECT
            case when label = 0 and prediction = 0 then 1 else 0 end as tn,
            case when label = 1 and prediction = 1 then 1 else 0 end as tp,
            case when label = 1 and prediction = 0 then 1 else 0 end as fn,
            case when label = 0 and prediction = 1 then 1 else 0 end as fp
        FROM
            predictions_res
    )
)
""").show()

+--------+---------+------+
|accuracy|precision|recall|
+--------+---------+------+
|  0.7782|   0.7896|0.5868|
+--------+---------+------+



Como esperávamos, o recall apresentou um valor bem mais interessante, demonstrando o potencial da solução, que certamente poderia ser refinada ainda mais a fim de atingir resultados mais expressivos.

<hr>