# Setup do ambiente para utilização do Spark

In [2]:
# Importando a biblioteca os
import os
import sys

In [3]:
# Instalação e configuração de variaveis ambiente para utilizar Google Drive
# Se utilizar o Colab ajustar para True para instalação dos pre-requisitos
colab = True

if colab==True:
    from google.colab import drive
    drive.mount('/content/drive')

    # Instalação de requisitos
    !apt-get update # Update apt-get repository.
    !apt-get install openjdk-8-jdk-headless -qq > /dev/null # Install Java.
    !wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz # Download Apache Sparks.
    !tar xf spark-3.1.1-bin-hadoop3.2.tgz # Unzip the tgz file.
    !pip install -q findspark # Install findspark. Adds PySpark to the System path during runtime.

    # Definindo a variável de ambiente do Java
    os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
    # Definindo a variável de ambiente do Spark
    #os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"
    os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

Mounted at /content/drive
Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Get:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Hit:4 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:6 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
Get:7 http://archive.ubuntu.com/ubuntu jammy-updates/universe amd64 Packages [1,357 kB]
Get:8 http://archive.ubuntu.com/ubuntu jammy-updates/main amd64 Packages [1,920 kB]
Hit:9 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:11 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:12 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Fetched 3,510 kB in 3s (1,348 kB/s)
R

In [4]:
# Variaveis de configuração
# Diretorio base dos dados

# Local PC
#dir_base = "data/"
# Google Drive
dir_base = "/content/drive/MyDrive/jupyter/pcd_0124_analise_de_credito/data/"

In [5]:
# Importação de bibliotecas
import findspark

# Iniciando o findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Aplicação Spark - Buerau") \
    .getOrCreate()

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable


# Leitura dos dados

In [6]:
# Leitura do arquivo de dados
data_ac = spark.read.csv(dir_base + "bureau.csv", header=True)

In [7]:
# Pré visualização dos dados
data_ac.show(10)

+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+---------------+------------------+-----------+
|SK_ID_CURR|SK_ID_BUREAU|CREDIT_ACTIVE|CREDIT_CURRENCY|DAYS_CREDIT|CREDIT_DAY_OVERDUE|DAYS_CREDIT_ENDDATE|DAYS_ENDDATE_FACT|AMT_CREDIT_MAX_OVERDUE|CNT_CREDIT_PROLONG|AMT_CREDIT_SUM|AMT_CREDIT_SUM_DEBT|AMT_CREDIT_SUM_LIMIT|AMT_CREDIT_SUM_OVERDUE|    CREDIT_TYPE|DAYS_CREDIT_UPDATE|AMT_ANNUITY|
+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+---------------+------------------+-----------+
|    215354|     5714462|       Closed|     currency 1|       -497|                 0|             -153.0|           -153

# Criação de Flags para auxílio na visão temporal dos dados

In [8]:
# Verifica valor máximo da coluna e gera valores baseados no intervalo
max_value = 360 # No campo DAYS_CREDIT o valor é em dias - 24 meses
interval = 360 # Usar intervalos em dias
list_value = [90,180] # Valores iniciais já adicionados na lista fora do intervalo

for i in range(0, max_value+1, interval):
    list_value.append(i)
list_value.remove(0)
column_name = 'DAYS_CREDIT'

def case_when_flags_days(list_value, column_name):
    last = len(list_value)-1
    sql_case = ""
    list_columns_flag = []
    list_sql_case = []
    for i in list_value:
        sql_case += "CASE WHEN " + column_name + " >= -" + str(i) + " THEN 1 ELSE 0 END AS ultimos_" + str(int(i/30)) + "_meses"
        list_columns_flag.append("ultimos_" + str(int(i/30)) + "_meses")
        if i!=list_value[last]:
            sql_case += ", "

    list_sql_case.append(sql_case)
    list_sql_case.append(list_columns_flag)

    return list_sql_case

list_case = case_when_flags_days(list_value, column_name)
list_case_query = list_case[0]
list_flags_columns = list_case[1]

In [9]:
# Query SQL com visão temporal
spark_query = f"""
SELECT *, {list_case_query}
    FROM data_ac
    ORDER BY SK_ID_BUREAU
"""

data_ac.createOrReplaceTempView("data_ac")

df_tmp_01 = spark.sql(spark_query)

df_tmp_01.createOrReplaceTempView("df_tmp_01")

In [10]:
# Pré visualização dos dados
df_tmp_01.show(10)

+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+--------------------+------------------+-----------+---------------+---------------+----------------+
|SK_ID_CURR|SK_ID_BUREAU|CREDIT_ACTIVE|CREDIT_CURRENCY|DAYS_CREDIT|CREDIT_DAY_OVERDUE|DAYS_CREDIT_ENDDATE|DAYS_ENDDATE_FACT|AMT_CREDIT_MAX_OVERDUE|CNT_CREDIT_PROLONG|AMT_CREDIT_SUM|AMT_CREDIT_SUM_DEBT|AMT_CREDIT_SUM_LIMIT|AMT_CREDIT_SUM_OVERDUE|         CREDIT_TYPE|DAYS_CREDIT_UPDATE|AMT_ANNUITY|ultimos_3_meses|ultimos_6_meses|ultimos_12_meses|
+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+--------------------+------------------+-----------+---------

In [11]:
# Query SQL com visão temporal
spark_query = """
SELECT distinct CREDIT_ACTIVE FROM data_ac
"""
data_ac.createOrReplaceTempView("data_ac")

check_CREDIT_ACTIVE= spark.sql(spark_query)

check_CREDIT_ACTIVE.createOrReplaceTempView("df_tmp_01")
check_CREDIT_ACTIVE.show()

+-------------+
|CREDIT_ACTIVE|
+-------------+
|     Bad debt|
|         Sold|
|       Active|
|       Closed|
+-------------+



In [12]:
# Status dos créditos relatados pelo Credit Bureau (CB)
df_tmp_01.createOrReplaceTempView("df_tmp_01")

df_tmp_02 = spark.sql("""
SELECT *,
    CASE
        WHEN CREDIT_ACTIVE = "Closed" THEN 1
        ELSE 0
    END AS CREDIT_ACTIVE_CLOSED,
    CASE
        WHEN CREDIT_ACTIVE = "Active" THEN 1
        ELSE 0
    END AS CREDIT_ACTIVE_ACTIVE,
    CASE
        WHEN CREDIT_ACTIVE = "Sold" THEN 1
        ELSE 0
    END AS CREDIT_ACTIVE_SOLD,
    CASE
        WHEN CREDIT_ACTIVE = "Bad debt" THEN 1
        ELSE 0
    END AS CREDIT_ACTIVE_BAD_DEBT
FROM  df_tmp_01
ORDER BY `SK_ID_BUREAU`
""")

df_tmp_02.createOrReplaceTempView('df_tmp_02')

In [13]:
# Pré visualização dos dados
df_tmp_02.show(10)

+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+--------------------+------------------+-----------+---------------+---------------+----------------+--------------------+--------------------+------------------+----------------------+
|SK_ID_CURR|SK_ID_BUREAU|CREDIT_ACTIVE|CREDIT_CURRENCY|DAYS_CREDIT|CREDIT_DAY_OVERDUE|DAYS_CREDIT_ENDDATE|DAYS_ENDDATE_FACT|AMT_CREDIT_MAX_OVERDUE|CNT_CREDIT_PROLONG|AMT_CREDIT_SUM|AMT_CREDIT_SUM_DEBT|AMT_CREDIT_SUM_LIMIT|AMT_CREDIT_SUM_OVERDUE|         CREDIT_TYPE|DAYS_CREDIT_UPDATE|AMT_ANNUITY|ultimos_3_meses|ultimos_6_meses|ultimos_12_meses|CREDIT_ACTIVE_CLOSED|CREDIT_ACTIVE_ACTIVE|CREDIT_ACTIVE_SOLD|CREDIT_ACTIVE_BAD_DEBT|
+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+--------------

In [14]:
# Query SQL com visão temporal
spark_query = """
SELECT distinct CREDIT_CURRENCY FROM data_ac
"""
data_ac.createOrReplaceTempView("data_ac")

check_CREDIT_CURRENCY= spark.sql(spark_query)
check_CREDIT_CURRENCY.createOrReplaceTempView("df_tmp_01")
#check_CREDIT_CURRENCY.show()

In [15]:
df_tmp_02.createOrReplaceTempView("df_tmp_02")

df_tmp_03 = spark.sql("""
SELECT *,
    CASE
        WHEN CREDIT_CURRENCY = "currency 1" THEN 1
        ELSE 0
    END AS CREDIT_CURRENCY_1,
    CASE
        WHEN CREDIT_CURRENCY = "currency 2" THEN 1
        ELSE 0
    END AS CREDIT_CURRENCY_2,
    CASE
        WHEN CREDIT_CURRENCY = "currency 3" THEN 1
        ELSE 0
    END AS CREDIT_CURRENCY_3,
    CASE
        WHEN CREDIT_CURRENCY = "currency 4" THEN 1
        ELSE 0
    END AS CREDIT_CURRENCY_4
FROM df_tmp_02
ORDER BY `SK_ID_BUREAU`
""")

df_tmp_03.createOrReplaceTempView("df_tmp_03")

In [16]:
#Pré visualização dos dados
df_tmp_03.show(10)

+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+--------------------+------------------+-----------+---------------+---------------+----------------+--------------------+--------------------+------------------+----------------------+-----------------+-----------------+-----------------+-----------------+
|SK_ID_CURR|SK_ID_BUREAU|CREDIT_ACTIVE|CREDIT_CURRENCY|DAYS_CREDIT|CREDIT_DAY_OVERDUE|DAYS_CREDIT_ENDDATE|DAYS_ENDDATE_FACT|AMT_CREDIT_MAX_OVERDUE|CNT_CREDIT_PROLONG|AMT_CREDIT_SUM|AMT_CREDIT_SUM_DEBT|AMT_CREDIT_SUM_LIMIT|AMT_CREDIT_SUM_OVERDUE|         CREDIT_TYPE|DAYS_CREDIT_UPDATE|AMT_ANNUITY|ultimos_3_meses|ultimos_6_meses|ultimos_12_meses|CREDIT_ACTIVE_CLOSED|CREDIT_ACTIVE_ACTIVE|CREDIT_ACTIVE_SOLD|CREDIT_ACTIVE_BAD_DEBT|CREDIT_CURRENCY_1|CREDIT_CURRENCY_2|CREDIT_CURRENCY_3|CREDIT_CURREN

# Sumarizar na visão cliente (Automatizada)

In [17]:
from pyspark.sql.functions import col, round, sum, avg, max, min, when

# Definir as colunas para agregação
columns_agg_total = ['CREDIT_DAY_OVERDUE','DAYS_CREDIT_ENDDATE','DAYS_ENDDATE_FACT','AMT_CREDIT_MAX_OVERDUE',
                           'CNT_CREDIT_PROLONG','AMT_CREDIT_SUM','AMT_CREDIT_SUM_DEBT',
                           'AMT_CREDIT_SUM_LIMIT','AMT_CREDIT_SUM_OVERDUE','DAYS_CREDIT_UPDATE','AMT_ANNUITY']

columns_flags = list_flags_columns

expressions_agg = []

for flag in columns_flags:
    for column in columns_agg_total:
        if "DAY" in column:
            expressions_agg.append(round(max(when(col(flag) == 1, col(column))), 2).alias(f"QT_MAX_{column.upper()}_{flag.upper()}"))
            expressions_agg.append(round(min(when(col(flag) == 1, col(column))), 2).alias(f"QT_MIN_{column.upper()}_{flag.upper()}"))
        else:
            expressions_agg.append(round(sum(when(col(flag) == 1, col(column))), 2).alias(f"QT_TOT_{column.upper()}_{flag.upper()}"))
            expressions_agg.append(round(avg(when(col(flag) == 1, col(column))), 2).alias(f"QT_MED_{column.upper()}_{flag.upper()}"))
            expressions_agg.append(round(max(when(col(flag) == 1, col(column))), 2).alias(f"QT_MAX_{column.upper()}_{flag.upper()}"))
            expressions_agg.append(round(min(when(col(flag) == 1, col(column))), 2).alias(f"QT_MIN_{column.upper()}_{flag.upper()}"))

expressions_agg = tuple(expressions_agg)

#Aplicação das expressões de agregação
df_tmp_04 = df_tmp_03.groupBy("SK_ID_BUREAU").agg(*expressions_agg).orderBy("SK_ID_BUREAU")

In [18]:
# Pré visualização dos dados
df_tmp_04.show(10)

+------------+-----------------------------------------+-----------------------------------------+------------------------------------------+------------------------------------------+----------------------------------------+----------------------------------------+---------------------------------------------+---------------------------------------------+---------------------------------------------+---------------------------------------------+-----------------------------------------+-----------------------------------------+-----------------------------------------+-----------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+-------------------------------------------+--------------

# Join com a bureau balance

In [19]:
bureau_balance = spark.read.csv(dir_base + "bureau_balance_agg.csv", header=True)

In [20]:
df_tmp_05 = df_tmp_04.join(bureau_balance, on='SK_ID_BUREAU', how='left')

In [21]:
#Pré visualização dos dados
df_tmp_05.show(10)

+------------+-----------------------------------------+-----------------------------------------+------------------------------------------+------------------------------------------+----------------------------------------+----------------------------------------+---------------------------------------------+---------------------------------------------+---------------------------------------------+---------------------------------------------+-----------------------------------------+-----------------------------------------+-----------------------------------------+-----------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+-------------------------------------------+--------------

In [22]:
df_tmp_05.count()

1716428

In [23]:
len(df_tmp_05.columns)

133

# Join deste resultado com a base bureau

In [24]:
df_tmp_06 = df_tmp_03.select("SK_ID_BUREAU", "SK_ID_CURR").join(df_tmp_05, on='SK_ID_BUREAU', how='left')

In [25]:
#Pré visualização dos dados
df_tmp_06.show(10)

+------------+----------+-----------------------------------------+-----------------------------------------+------------------------------------------+------------------------------------------+----------------------------------------+----------------------------------------+---------------------------------------------+---------------------------------------------+---------------------------------------------+---------------------------------------------+-----------------------------------------+-----------------------------------------+-----------------------------------------+-----------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+-------------------------------------------+---

In [26]:
len(df_tmp_06.columns)

134

# Sumarizar base resultante na visão cliente

In [27]:
# Definir as colunas para agregação
columns_agg_total = df_tmp_06.columns
columns_agg_total.remove('SK_ID_CURR')
columns_agg_total.remove('SK_ID_BUREAU')

expressions_agg = []

for column in columns_agg_total:
  if 'DAY' in column:
    expressions_agg.append(round(max(col(column)), 2).alias(f"QT_MAX_{column.upper()}"))
    expressions_agg.append(round(min(col(column)), 2).alias(f"QT_MIN_{column.upper()}"))
  else:
    expressions_agg.append(round(sum(col(column)), 2).alias(f"VL_TOT_{column.upper()}"))
    expressions_agg.append(round(avg(col(column)), 2).alias(f"VL_MED_{column.upper()}"))
    expressions_agg.append(round(max(col(column)), 2).alias(f"VL_MAX_{column.upper()}"))
    expressions_agg.append(round(min(col(column)), 2).alias(f"VL_MIN_{column.upper()}"))

expressions_agg = tuple(expressions_agg)

# Aplicar as expressões de agregação
df_tmp_07 = df_tmp_06.groupBy("SK_ID_CURR").agg(*expressions_agg).orderBy("SK_ID_CURR")

In [28]:
#Número de colunas
#len(df_tmp_07.columns)

In [29]:
# Mostrar o DataFrame resultante
#df_tmp_07.show(5)

# Salvar tabela sumarizada

In [30]:
df_tmp_07 = df_tmp_07.repartition(1)
df_tmp_07.write.mode("overwrite").csv(dir_base + "bureau_agg.csv",header=True)