In [1]:
## usar python 3.8.8
import findspark
findspark.init()

In [2]:
# Importando SparkSession para criar uma sessão do Spark
from pyspark.sql import SparkSession

# Importando funções e tipos de dados SparkSQL
from pyspark.sql import functions as f
from pyspark.sql.types import *

# Importando módulos Spark MLlib
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Importando SparkContext e SparkConf
from pyspark import SparkContext, SparkConf


In [3]:

# Criando uma nova sessão do Spark

# Spark entry point
spark = SparkSession \
    .builder \
    .appName("Feature_engineering-pkdd99-xpeMBA") \
    .getOrCreate()

spark.version


'3.0.0'

In [4]:

def read_df_csv(tabela=str):
    """
    Função para as bases de dados onde retorna no print o 'shape', um breve 'show' e o Scheema das variáveis.
    :param entidade_name: string que referencie o nome da tabela que complete o caminho './dados_originais/{tabela}.csv'. 
    tabela pode ser => trans, account, card, client, disp, district, loan, order 
    :return: DataFrame em pyspark
    """
    path ="C:\\Users\\renat\\Documents\\00_MBA\\PROJETO_APLICADO\\ML-predict-loan-MBA-applied-project\\dados_tratados"
    df = spark.read.csv(path = f'{path}/{tabela}.csv', header='True',inferSchema='False', sep=';')
    print('\n','A base de dados possui:',df.count(), 'linhas', 'e', len(df.columns), 'colunas', '\n')
    print(df.show(5))
    print(df.printSchema())
    return(df)

In [5]:
trans_df = read_df_csv('trans')


 A base de dados possui: 1056320 linhas e 10 colunas 

+--------+----------+----------+------+--------------+------+-------+--------+----+-------+
|trans_id|account_id|      date|  type|     operation|amount|balance|category|bank|account|
+--------+----------+----------+------+--------------+------+-------+--------+----+-------+
|  695247|      2378|1993-01-01|credit|credit_in_cash| 700.0|  700.0|    null|null|   null|
|  171812|       576|1993-01-01|credit|credit_in_cash| 900.0|  900.0|    null|null|   null|
|  207264|       704|1993-01-01|credit|credit_in_cash|1000.0| 1000.0|    null|null|   null|
| 1117247|      3818|1993-01-01|credit|credit_in_cash| 600.0|  600.0|    null|null|   null|
|  579373|      1972|1993-01-02|credit|credit_in_cash| 400.0|  400.0|    null|null|   null|
+--------+----------+----------+------+--------------+------+-------+--------+----+-------+
only showing top 5 rows

None
root
 |-- trans_id: string (nullable = true)
 |-- account_id: string (nullable = true)

In [6]:

def read_df_csv(tabela=str):
    """
    Função para as bases de dados onde retorna no print o 'shape', um breve 'show' e o Scheema das variáveis.
    :param entidade_name: string que referencie o nome da tabela que complete o caminho './dados_originais/{tabela}.csv'. 
    tabela pode ser => account, card, client, disp, district, loan, order 
    :return: DataFrame em pyspark
    """
    path ="C:\\Users\\renat\\Documents\\00_MBA\\PROJETO_APLICADO\\ML-predict-loan-MBA-applied-project\\dados_mergeados"
    df = spark.read.csv(path = f'{path}', header='True',inferSchema='False', sep=',')
    print('\n','A base de dados possui:',df.count(), 'linhas', 'e', len(df.columns), 'colunas', '\n')
    print(df.show(5))
    print(df.printSchema())
    return(df)

In [7]:
features = read_df_csv()


 A base de dados possui: 827 linhas e 20 colunas 

+-------+---------+---------------+----------------+--------+----------+---------+-------+----------+----------+------+--------+--------+------+----------+------------------+------+-------+---------+----------+
|disp_id|client_id|account_id_acct|district_id_bank|stmt_frq| date_acct|type_disp|loan_id|account_id| date_loan|amount|duration|payments|status|date_birth|district_id_client|gender|card_id|type_card| date_card|
+-------+---------+---------------+----------------+--------+----------+---------+-------+----------+----------+------+--------+--------+------+----------+------------------+------+-------+---------+----------+
|  12430|    12738|          10351|              23| monthly|1995-05-04|    owner|   7115|     10351|1997-03-04| 88704|      48|  1848.0|     C|1960-10-29|                23|     F|   null|     null|      null|
|  12431|    12739|          10351|              23| monthly|1995-05-04|disponent|   7115|     10351|199

In [8]:
def addcols_monbalstats(features,trans_df,M):
    
    trans_acctdate = trans_df.join(features.select('account_id','date_loan'),on='account_id',how='inner')
    
    # # descarte as transações que ocorreram após a data do empréstimo:
    trans_acctdate = trans_acctdate.filter(f.col('date')<f.col('date_loan'))

    # reduce to transactions with M*30days of the loan date (ie datediff < M*30):
    trans_acctdate = trans_acctdate.withColumn('datediff', f.datediff('date_loan', 'date'))
    trans_acctdate = trans_acctdate.filter(f.col('datediff') > M*30)

    monbalstats = trans_acctdate.groupBy('account_id').agg(f.min('balance'),f.max('balance'),f.mean('balance')) ### TENTAR COLOCAR UM ROUND AQUI 

    monbalstats = monbalstats \
    .withColumnRenamed('min(balance)', 'min{}'.format(M)) \
    .withColumnRenamed('max(balance)', 'max{}'.format(M)) \
    .withColumnRenamed('avg(balance)', 'mean{}'.format(M))

    features = features.join(monbalstats,on='account_id',how='left')
    
    return features

In [9]:
features = addcols_monbalstats(features,trans_df,1)
features = addcols_monbalstats(features,trans_df,2)
features = addcols_monbalstats(features,trans_df,3)
features = addcols_monbalstats(features,trans_df,4)
features = addcols_monbalstats(features,trans_df,5)
features = addcols_monbalstats(features,trans_df,6)

In [10]:
#features.show()

In [11]:
#features.select('min1','min2','min3','min4','min5','min6').show()

#+--------+--------+--------+--------+--------+--------+
#|    min1|    min2|    min3|    min4|    min5|    min6|
#+--------+--------+--------+--------+--------+--------+
#| 11853.6| 12052.4| 12052.4| 13704.8| 13704.8| 13704.8|
#| 11853.6| 12052.4| 12052.4| 13704.8| 13704.8| 13704.8|
#| 18196.0| 18196.0| 18196.0| 18196.0| 18196.0|   800.0|
#|100282.7|100282.7|100282.7|100282.7|100282.7|100282.7|
#| 10438.0| 10438.0| 10438.0| 10438.0| 10438.0| 10438.0|
#| 10438.0| 10438.0| 10438.0| 10438.0| 10438.0| 10438.0|
#| 17792.0| 17792.0| 17792.0| 17792.0| 17792.0| 17792.0|
#| 17792.0| 17792.0| 17792.0| 17792.0| 17792.0| 17792.0|
#|116809.2|116809.2|116809.2|116809.2|116809.2|116809.2|
#|116809.2|116809.2|116809.2|116809.2|116809.2|116809.2|
#| 27955.0| 27955.0| 27955.0|   800.0|    null|    null|
#|  1000.0|  1000.0|  1000.0|  1000.0|  1000.0|  1000.0|
#|106530.1|106530.1|106530.1|107258.7|107258.7|107258.7|
#|106530.1|106530.1|106530.1|107258.7|107258.7|107258.7|
#|  1100.0|  1100.0|  1100.0|  1100.0|  1100.0|  1100.0|
#| -1033.8|  -634.4| 10872.5| 10872.5| 11200.0| 11200.0|
#| 12000.0| 12000.0| 12000.0| 12000.0| 12000.0| 12000.0|
#| 14858.0| 14858.0| 14858.0| 14858.0| 14858.0| 14858.0|
#| 14858.0| 14858.0| 14858.0| 14858.0| 14858.0| 14858.0|
#|  1000.0|  1000.0|  1000.0|  1000.0|  1000.0|  1000.0|
#+--------+--------+--------+--------+--------+--------+

Em seguida, devemos identificar nossa variável de resposta (alvo), o status do empréstimo. Vamos verificar as contagens dos quatro status possíveis. Vamos agrupar os status A e C como status de empréstimo "bons" e B e D como status "ruins". No futuro, podemos tentar refazer a classificação usando apenas B (padrão) ou D (inadimplência) como "ruim" e ver como o modelo se compara. De qualquer forma, observando as contagens para cada um desses status, vemos que precisamos considerar a questão do balanceamento dos dados para a classificação.

In [12]:
#print(features.groupBy('status').count().show())
#+------+-----+
#|status|count|
#+------+-----+
#|     B|   31|
#|     D|   45|
#|     C|  493|
#|     A|  258|
#+------+-----+


Códigos de status do empréstimo:

A = contrato finalizado, sem problemas

B = contrato encerrado, empréstimo não pago

C = contrato em execução, OK até agora

D = contrato em execução, cliente em dívida

Agora configuramos as coisas para lidar com as variáveis categóricas. Primeiro, vamos converter essas variáveis categóricas binárias em 0s e 1s:

In [13]:
# Convert response var `status` = {A,B,C,D} to `response` = {0,1} (AC good, BD bad):

features = features.withColumn('response', f.when(f.col('status').isin(['A','C']), 1).otherwise(features.status)) # good
features = features.withColumn('response', f.when(f.col('status').isin(['B','D']), 0).otherwise(features.response)) # bad

# Convert gender={M,F} to gender={0,1}:
features = (
    features
    .withColumn('gender',
                 f.when(f.col('gender')=='F', 1).otherwise(0) 
                 )
)

#features.groupBy('gender').count().show()
#+------+-----+
#|gender|count|
#+------+-----+
#|     1|  417|
#|     0|  410|
#+------+-----+
#


In [14]:

# Existem recursos de cartão de crédito, mas nem todos os clientes têm cartões, portanto, esses recursos podem ser Nan,
# que não é aceitável na modelagem. Vamos criar um recurso `has_card`={0,1}, solte o
# data em que o cartão foi aberto, e abaixo ainda usaremos o recurso type_card de certa forma
# que evita NaNs.

features =  (
    features
    .withColumn('has_card', 
                f.when(f.col('date_card').isNotNull(),1).otherwise(0)
                )
    .drop('date_card')
                )


In [15]:

features = features.withColumn("idade", f.floor(f.datediff(f.to_date("date_loan", "yyyy-MM-dd"), f.to_date("date_birth", "yyyy-MM-dd"))/365))
#features.select('idade').summary().show()
#+-------+------------------+
#|summary|             idade|
#+-------+------------------+
#|  count|               827|
#|   mean| 37.85368802902055|
#| stddev|13.089711698987298|
#|    min|                 7|
#|    25%|                27|
#|    50%|                37|
#|    75%|                49|
#|    max|                63|
#+-------+------------------+

In [16]:

features = features.withColumn("days_between", f.datediff(f.to_date("date_loan", "yyyy-MM-dd"), f.to_date("date_acct", "yyyy-MM-dd")))
#features.select('days_between').summary().show()
#+-------+------------------+
#|summary|      days_between|
#+-------+------------------+
#|  count|               827|
#|   mean| 400.4570737605804|
#| stddev|164.90081113645462|
#|    min|               102|
#|    25%|               267|
#|    50%|               398|
#|    75%|               533|
#|    max|               697|
#+-------+------------------+

In [17]:
#print(features.count(), len(features.columns))
# 827 41

In [18]:
caminho = 'C:\\Users\\renat\\Documents\\00_MBA\\PROJETO_APLICADO\\ML-predict-loan-MBA-applied-project\\dados_features'

In [22]:
#features.show()

+----------+-------+---------+---------------+----------------+--------+----------+---------+-------+----------+------+--------+--------+------+----------+------------------+------+-------+---------+--------+-------+------------------+--------+-------+------------------+--------+-------+------------------+--------+-------+------------------+--------+-------+------------------+--------+-------+------------------+--------+--------+-----+------------+
|account_id|disp_id|client_id|account_id_acct|district_id_bank|stmt_frq| date_acct|type_disp|loan_id| date_loan|amount|duration|payments|status|date_birth|district_id_client|gender|card_id|type_card|    min1|   max1|             mean1|    min2|   max2|             mean2|    min3|   max3|             mean3|    min4|   max4|             mean4|    min5|   max5|             mean5|    min6|   max6|             mean6|response|has_card|idade|days_between|
+----------+-------+---------+---------------+----------------+--------+----------+---------+-

In [28]:
features.coalesce(1).write.format("csv").option("header", "true").mode("overwrite").save(f"{caminho}")  