## Configuração do ambiente para utilização do Spark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Fazendo download
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz

# Descompactando os arquivos
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

# Importando a biblioteca os
import os

# Definindo a variável de ambiente do Java
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# Definindo a variável de ambiente do Spark
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"


# instalando a findspark
!pip install -q findspark

# Importando a findspark
import findspark

# Iniciando o findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .appName("Minha Primeira Aplicação no Pyspark") \
        .getOrCreate()

import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

## Leitura dos dados

In [None]:
import pandas as pd
df1 = pd.read_csv('/content/drive/MyDrive/bases_dados_projeto_credito/bureau.csv')
df1.head()

Unnamed: 0,SK_ID_CURR,SK_ID_BUREAU,CREDIT_ACTIVE,CREDIT_CURRENCY,DAYS_CREDIT,CREDIT_DAY_OVERDUE,DAYS_CREDIT_ENDDATE,DAYS_ENDDATE_FACT,AMT_CREDIT_MAX_OVERDUE,CNT_CREDIT_PROLONG,AMT_CREDIT_SUM,AMT_CREDIT_SUM_DEBT,AMT_CREDIT_SUM_LIMIT,AMT_CREDIT_SUM_OVERDUE,CREDIT_TYPE,DAYS_CREDIT_UPDATE,AMT_ANNUITY
0,215354,5714462,Closed,currency 1,-497,0,-153.0,-153.0,,0,91323.0,0.0,,0.0,Consumer credit,-131,
1,215354,5714463,Active,currency 1,-208,0,1075.0,,,0,225000.0,171342.0,,0.0,Credit card,-20,
2,215354,5714464,Active,currency 1,-203,0,528.0,,,0,464323.5,,,0.0,Consumer credit,-16,
3,215354,5714465,Active,currency 1,-203,0,,,,0,90000.0,,,0.0,Credit card,-16,
4,215354,5714466,Active,currency 1,-629,0,1197.0,,77674.5,0,2700000.0,,,0.0,Consumer credit,-21,


In [None]:
# Lê o arquivo Parquet
dados = spark.read.csv("/content/drive/MyDrive/bases_dados_projeto_credito/bureau.csv",header=True)

# Mostra os dados
dados.show()

+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+---------------+------------------+-----------+
|SK_ID_CURR|SK_ID_BUREAU|CREDIT_ACTIVE|CREDIT_CURRENCY|DAYS_CREDIT|CREDIT_DAY_OVERDUE|DAYS_CREDIT_ENDDATE|DAYS_ENDDATE_FACT|AMT_CREDIT_MAX_OVERDUE|CNT_CREDIT_PROLONG|AMT_CREDIT_SUM|AMT_CREDIT_SUM_DEBT|AMT_CREDIT_SUM_LIMIT|AMT_CREDIT_SUM_OVERDUE|    CREDIT_TYPE|DAYS_CREDIT_UPDATE|AMT_ANNUITY|
+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+---------------+------------------+-----------+
|    215354|     5714462|       Closed|     currency 1|       -497|                 0|             -153.0|           -153

## Criação de flags para nos auxiliar na visão temporal dos dados

In [None]:
## Habilitando uso do SparkSQL
dados.createOrReplaceTempView("dados")

df_temp_01 = spark.sql("""
SELECT
    *,
      CASE
        WHEN DAYS_CREDIT >= -90 THEN 1
        ELSE 0
    END AS ultimos_3_meses,
    CASE
        WHEN DAYS_CREDIT >= -180 THEN 1
        ELSE 0
    END AS ultimos_6_meses,
    CASE
        WHEN DAYS_CREDIT >= -360 THEN 1
        ELSE 0
    END AS ultimos_12_meses,
    CASE
        WHEN DAYS_CREDIT >= -720 THEN 1
        ELSE 0
    END AS ultimos_24_meses,
    CASE
        WHEN DAYS_CREDIT >= -1080 THEN 1
        ELSE 0
    END AS ultimos_36_meses
FROM dados
ORDER BY `SK_ID_BUREAU`;
""")
df_temp_01.createOrReplaceTempView("df_temp_01")
df_temp_01.show()

+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+--------------------+------------------+-----------+---------------+---------------+----------------+----------------+----------------+
|SK_ID_CURR|SK_ID_BUREAU|CREDIT_ACTIVE|CREDIT_CURRENCY|DAYS_CREDIT|CREDIT_DAY_OVERDUE|DAYS_CREDIT_ENDDATE|DAYS_ENDDATE_FACT|AMT_CREDIT_MAX_OVERDUE|CNT_CREDIT_PROLONG|AMT_CREDIT_SUM|AMT_CREDIT_SUM_DEBT|AMT_CREDIT_SUM_LIMIT|AMT_CREDIT_SUM_OVERDUE|         CREDIT_TYPE|DAYS_CREDIT_UPDATE|AMT_ANNUITY|ultimos_3_meses|ultimos_6_meses|ultimos_12_meses|ultimos_24_meses|ultimos_36_meses|
+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------

In [None]:
df_temp_01.createOrReplaceTempView("df_temp_01")

df_temp_02 = spark.sql("""
SELECT
    *,
      CASE
        WHEN CREDIT_ACTIVE = "Closed" THEN 1
        ELSE 0
    END AS CREDIT_ACTIVE_CLOSED,
    CASE
        WHEN CREDIT_ACTIVE = "Active" THEN 1
        ELSE 0
    END AS CREDIT_ACTIVE_ACTIVE,
    CASE
        WHEN CREDIT_ACTIVE = "Sold" THEN 1
        ELSE 0
    END AS CREDIT_ACTIVE_SOLD,
    CASE
        WHEN CREDIT_ACTIVE = "Bad debt" THEN 1
        ELSE 0
    END AS CREDIT_ACTIVE_BAD_DEBT
FROM df_temp_01
ORDER BY `SK_ID_BUREAU`;
""")
df_temp_02.createOrReplaceTempView("df_temp_01")
df_temp_02.show()

+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+--------------------+------------------+-----------+---------------+---------------+----------------+----------------+----------------+--------------------+--------------------+------------------+----------------------+
|SK_ID_CURR|SK_ID_BUREAU|CREDIT_ACTIVE|CREDIT_CURRENCY|DAYS_CREDIT|CREDIT_DAY_OVERDUE|DAYS_CREDIT_ENDDATE|DAYS_ENDDATE_FACT|AMT_CREDIT_MAX_OVERDUE|CNT_CREDIT_PROLONG|AMT_CREDIT_SUM|AMT_CREDIT_SUM_DEBT|AMT_CREDIT_SUM_LIMIT|AMT_CREDIT_SUM_OVERDUE|         CREDIT_TYPE|DAYS_CREDIT_UPDATE|AMT_ANNUITY|ultimos_3_meses|ultimos_6_meses|ultimos_12_meses|ultimos_24_meses|ultimos_36_meses|CREDIT_ACTIVE_CLOSED|CREDIT_ACTIVE_ACTIVE|CREDIT_ACTIVE_SOLD|CREDIT_ACTIVE_BAD_DEBT|
+----------+------------+-------------+---------------+-----------+---

In [None]:
df_temp_02.createOrReplaceTempView("df_temp_02")

df_temp_03 = spark.sql("""
SELECT
    *,
      CASE
        WHEN CREDIT_CURRENCY = "currency 1" THEN 1
        ELSE 0
    END AS CREDIT_CURRENCY_currency_1,
    CASE
        WHEN CREDIT_CURRENCY = "currency 2" THEN 1
        ELSE 0
    END AS CREDIT_CURRENCY_currency_2,
    CASE
        WHEN CREDIT_CURRENCY = "currency 3" THEN 1
        ELSE 0
    END AS CREDIT_CURRENCY_currency_3,
    CASE
        WHEN CREDIT_CURRENCY = "currency 4" THEN 1
        ELSE 0
    END AS CREDIT_CURRENCY_currency_4
FROM df_temp_02
ORDER BY `SK_ID_BUREAU`;
""")
df_temp_03.createOrReplaceTempView("df_temp_02")
df_temp_03.show()

+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+--------------------+------------------+-----------+---------------+---------------+----------------+----------------+----------------+--------------------+--------------------+------------------+----------------------+--------------------------+--------------------------+--------------------------+--------------------------+
|SK_ID_CURR|SK_ID_BUREAU|CREDIT_ACTIVE|CREDIT_CURRENCY|DAYS_CREDIT|CREDIT_DAY_OVERDUE|DAYS_CREDIT_ENDDATE|DAYS_ENDDATE_FACT|AMT_CREDIT_MAX_OVERDUE|CNT_CREDIT_PROLONG|AMT_CREDIT_SUM|AMT_CREDIT_SUM_DEBT|AMT_CREDIT_SUM_LIMIT|AMT_CREDIT_SUM_OVERDUE|         CREDIT_TYPE|DAYS_CREDIT_UPDATE|AMT_ANNUITY|ultimos_3_meses|ultimos_6_meses|ultimos_12_meses|ultimos_24_meses|ultimos_36_meses|CREDIT_ACTIVE_CLOSED|CREDIT_ACTIVE_ACTIVE|CREDI

## Sumarizar na visão cliente (Automatizada)

In [None]:
from pyspark.sql.functions import col, round, sum, avg, max, min, when

# Definir as colunas para agregação
colunas_agregacao_total = ['CREDIT_DAY_OVERDUE','DAYS_CREDIT_ENDDATE','DAYS_ENDDATE_FACT','AMT_CREDIT_MAX_OVERDUE',
                           'CNT_CREDIT_PROLONG','AMT_CREDIT_SUM','AMT_CREDIT_SUM_DEBT',
                           'AMT_CREDIT_SUM_LIMIT','AMT_CREDIT_SUM_OVERDUE','DAYS_CREDIT_UPDATE','AMT_ANNUITY']

colunas_flags = ['ultimos_3_meses','ultimos_6_meses','ultimos_12_meses','ultimos_24_meses','ultimos_36_meses']
# colunas_cat1 = ['CREDIT_ACTIVE_CLOSED','CREDIT_ACTIVE_ACTIVE','CREDIT_ACTIVE_SOLD','CREDIT_ACTIVE_BAD_DEBT']
# colunas_cat2 = ['CREDIT_CURRENCY_currency_1','CREDIT_CURRENCY_currency_2','CREDIT_CURRENCY_currency_3','CREDIT_CURRENCY_currency_4']

expressoes_agregacao = []

for flag in colunas_flags:
    for coluna in colunas_agregacao_total:
      if 'DAY' in coluna:
        expressoes_agregacao.append(round(max(when(col(flag) == 1, col(coluna))), 2).alias(f"QT_MAX_{coluna.upper()}_{flag.upper()}"))
        expressoes_agregacao.append(round(min(when(col(flag) == 1, col(coluna))), 2).alias(f"QT_MIN_{coluna.upper()}_{flag.upper()}"))
      else:
        expressoes_agregacao.append(round(sum(when(col(flag) == 1, col(coluna))), 2).alias(f"VL_TOT_{coluna.upper()}_{flag.upper()}"))
        expressoes_agregacao.append(round(avg(when(col(flag) == 1, col(coluna))), 2).alias(f"VL_MED_{coluna.upper()}_{flag.upper()}"))
        expressoes_agregacao.append(round(max(when(col(flag) == 1, col(coluna))), 2).alias(f"VL_MAX_{coluna.upper()}_{flag.upper()}"))
        expressoes_agregacao.append(round(min(when(col(flag) == 1, col(coluna))), 2).alias(f"VL_MIN_{coluna.upper()}_{flag.upper()}"))

expressoes_agregacao = tuple(expressoes_agregacao)

# Aplicar as expressões de agregação
df_temp_04 = df_temp_03.groupBy("SK_ID_BUREAU").agg(*expressoes_agregacao).orderBy("SK_ID_BUREAU")

# Mostrar o DataFrame resultante
df_temp_04.show()

+------------+-----------------------------------------+-----------------------------------------+------------------------------------------+------------------------------------------+----------------------------------------+----------------------------------------+---------------------------------------------+---------------------------------------------+---------------------------------------------+---------------------------------------------+-----------------------------------------+-----------------------------------------+-----------------------------------------+-----------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+-------------------------------------------+--------------

## Join com a bureau balance

In [None]:
bureau_balance = spark.read.csv("/content/drive/MyDrive/bases_dados_projeto_credito/bases_tratadas/bureau_balance_agg.csv",header=True)

In [None]:
df_temp_05 = df_temp_04.join(bureau_balance, on='SK_ID_BUREAU', how='left')

# Mostrar o resultado do join
df_temp_05.show()

+------------+-----------------------------------------+-----------------------------------------+------------------------------------------+------------------------------------------+----------------------------------------+----------------------------------------+---------------------------------------------+---------------------------------------------+---------------------------------------------+---------------------------------------------+-----------------------------------------+-----------------------------------------+-----------------------------------------+-----------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+-------------------------------------------+--------------

## Join deste resultado a base bureau

In [None]:
df_temp_06 = df_temp_03.select("SK_ID_BUREAU", "SK_ID_CURR").join(df_temp_05, on='SK_ID_BUREAU', how='left')

# Mostrar o resultado do join
df_temp_06.show()

+------------+----------+-----------------------------------------+-----------------------------------------+------------------------------------------+------------------------------------------+----------------------------------------+----------------------------------------+---------------------------------------------+---------------------------------------------+---------------------------------------------+---------------------------------------------+-----------------------------------------+-----------------------------------------+-----------------------------------------+-----------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+-------------------------------------------+---

## Sumarizar base resultante na visão cliente

In [None]:
# Definir as colunas para agregação
colunas_agregacao_total = df_temp_06.columns
colunas_agregacao_total.remove('SK_ID_CURR')
colunas_agregacao_total.remove('SK_ID_BUREAU')

expressoes_agregacao = []

for coluna in colunas_agregacao_total:
  if 'DAY' in coluna:
    expressoes_agregacao.append(round(max(col(coluna)), 2).alias(f"QT_MAX_{coluna.upper()}"))
    expressoes_agregacao.append(round(min(col(coluna)), 2).alias(f"QT_MIN_{coluna.upper()}"))
  else:
    expressoes_agregacao.append(round(sum(col(coluna)), 2).alias(f"VL_TOT_{coluna.upper()}"))
    expressoes_agregacao.append(round(avg(col(coluna)), 2).alias(f"VL_MED_{coluna.upper()}"))
    expressoes_agregacao.append(round(max(col(coluna)), 2).alias(f"VL_MAX_{coluna.upper()}"))
    expressoes_agregacao.append(round(min(col(coluna)), 2).alias(f"VL_MIN_{coluna.upper()}"))

expressoes_agregacao = tuple(expressoes_agregacao)

# Aplicar as expressões de agregação
df_temp_07 = df_temp_06.groupBy("SK_ID_CURR").agg(*expressoes_agregacao).orderBy("SK_ID_CURR")

# Mostrar o DataFrame resultante
df_temp_07.show()

+----------+------------------------------------------------+------------------------------------------------+------------------------------------------------+------------------------------------------------+-------------------------------------------------+-------------------------------------------------+-------------------------------------------------+-------------------------------------------------+-----------------------------------------------+-----------------------------------------------+-----------------------------------------------+-----------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+-----------------------------

## Salvar a tabela sumarizada

In [None]:
df_temp_07 = df_temp_07.repartition(1)
df_temp_07.write.mode("overwrite").csv("bureau_agg.csv",header=True)