In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import to_date, lit, date_format

try:
    spark.stop()
except:
    pass

spark = SparkSession.builder \
    .appName("Bureau") \
    .master("spark://spark-master:7077") \
    .config("spark.executor.memory", "2g") \
    .config("spark.executor.cores", "2") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/21 12:19:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:

bureau = spark.read.csv("/data/raw/bureau.csv",
                               header=True,
                               inferSchema=True)

bureau.createOrReplaceTempView("bureau")

# Contagem de linhas e colunas
num_rows = bureau.count()
num_columns = len(bureau.columns)

print(f'Quantidade de linhas: {num_rows}')
print(f'Quantidade de variaveis (colunas): {num_columns}')

bureau.show(5, truncate=False)

                                                                                

Quantidade de linhas: 1716428
Quantidade de variaveis (colunas): 17
+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+---------------+------------------+-----------+
|SK_ID_CURR|SK_ID_BUREAU|CREDIT_ACTIVE|CREDIT_CURRENCY|DAYS_CREDIT|CREDIT_DAY_OVERDUE|DAYS_CREDIT_ENDDATE|DAYS_ENDDATE_FACT|AMT_CREDIT_MAX_OVERDUE|CNT_CREDIT_PROLONG|AMT_CREDIT_SUM|AMT_CREDIT_SUM_DEBT|AMT_CREDIT_SUM_LIMIT|AMT_CREDIT_SUM_OVERDUE|CREDIT_TYPE    |DAYS_CREDIT_UPDATE|AMT_ANNUITY|
+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+---------------+------------------+-----------+
|215354    |5714462     |Closed       |currency 1    

## Lendo os dados da base bureau_balance

In [3]:
file_path = "/data/books/bureau_balance"
bureau_balance = spark.read.parquet(file_path)  

In [4]:
df_temp_05 = bureau.join(bureau_balance, on='SK_ID_BUREAU', how='left')

In [5]:
from pyspark.sql.functions import upper

# Converte os valores da coluna para maiúsculas
df_temp_05 = df_temp_05.withColumn("CREDIT_ACTIVE", upper(df_temp_05["CREDIT_ACTIVE"]))
df_temp_05 = df_temp_05.withColumn("CREDIT_CURRENCY", upper(df_temp_05["CREDIT_CURRENCY"]))
df_temp_05 = df_temp_05.withColumn("CREDIT_TYPE", upper(df_temp_05["CREDIT_TYPE"]))

## Criando variáveis de flag temporal

In [6]:
df_temp_05.createOrReplaceTempView("dados")

df_temp_01 = spark.sql("""
SELECT
    *,
      CASE
        WHEN DAYS_CREDIT >= -90 THEN 1
        ELSE 0
    END AS U3M,
    CASE
        WHEN DAYS_CREDIT >= -180 THEN 1
        ELSE 0
    END AS U6M,
    CASE
        WHEN DAYS_CREDIT >= -270 THEN 1
        ELSE 0
    END AS U9M,    
    CASE
        WHEN DAYS_CREDIT >= -360 THEN 1
        ELSE 0
    END AS U12M
FROM dados
ORDER BY `SK_ID_BUREAU`;
""")
df_temp_01.createOrReplaceTempView("df_temp_01")
df_temp_01.show(5)


25/07/21 12:24:21 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+------------+----------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+---------------+------------------+-----------+----------------------------+-----------------------------+-----------------------------+-----------------------------+----------------------------+-----------------------------+-----------------------------+-----------------------------+----------------------------+-----------------------------+-----------------------------+-----------------------------+----------------------------+-----------------------------+-----------------------------+-----------------------------+----------------------------+-----------------------------+-----------------------------+-----------------------------+----------------------------+-----------------------------+-----------------------------+------------------------

In [7]:
qtd_linhas = df_temp_01.count()

                                                                                

In [8]:

spark.sql("""
            Select
                CREDIT_ACTIVE,
                count(*) as VOLUME,
                round(100*(count(*)/{}),2) as VOL_PERCENT
            from 
                df_temp_01
            group by 
                CREDIT_ACTIVE
            order by 
                VOLUME desc
""".format(qtd_linhas)).show(50,False)




+-------------+-------+-----------+
|CREDIT_ACTIVE|VOLUME |VOL_PERCENT|
+-------------+-------+-----------+
|CLOSED       |1079273|62.88      |
|ACTIVE       |630607 |36.74      |
|SOLD         |6527   |0.38       |
|BAD DEBT     |21     |0.0        |
+-------------+-------+-----------+



                                                                                

In [9]:
df_temp_01.createOrReplaceTempView("df_temp_01")

df_temp_02 = spark.sql("""
SELECT
    *,
      CASE
        WHEN CREDIT_ACTIVE = "CLOSED" THEN 1
        ELSE 0
    END AS CRED_ACT_CLD,
    CASE
        WHEN CREDIT_ACTIVE = "ACTIVE" THEN 1
        ELSE 0
    END AS CRED_ACT_ACT
FROM df_temp_01
ORDER BY `SK_ID_BUREAU`;
""")
df_temp_02.createOrReplaceTempView("df_temp_01")
df_temp_02.show(5)



+------------+----------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+---------------+------------------+-----------+----------------------------+-----------------------------+-----------------------------+-----------------------------+----------------------------+-----------------------------+-----------------------------+-----------------------------+----------------------------+-----------------------------+-----------------------------+-----------------------------+----------------------------+-----------------------------+-----------------------------+-----------------------------+----------------------------+-----------------------------+-----------------------------+-----------------------------+----------------------------+-----------------------------+-----------------------------+------------------------

                                                                                

In [10]:
qtd_linhas = df_temp_02.count()

                                                                                

In [12]:

spark.sql("""
            Select
                CREDIT_CURRENCY,
                count(*) as VOLUME,
                round(100*(count(*)/{}),2) as VOL_PERCENT
            from 
                df_temp_01
            group by 
                CREDIT_CURRENCY
            order by 
                VOLUME desc
""".format(qtd_linhas)).show(50,False)



+---------------+-------+-----------+
|CREDIT_CURRENCY|VOLUME |VOL_PERCENT|
+---------------+-------+-----------+
|CURRENCY 1     |1715020|99.92      |
|CURRENCY 2     |1224   |0.07       |
|CURRENCY 3     |174    |0.01       |
|CURRENCY 4     |10     |0.0        |
+---------------+-------+-----------+



                                                                                

In [13]:
df_temp_02.createOrReplaceTempView("df_temp_02")

df_temp_03 = spark.sql("""
SELECT
    *,
      CASE
        WHEN CREDIT_CURRENCY = "CURRENCY 1" THEN 1
        ELSE 0
    END AS CRED_CURR_1
FROM df_temp_02
ORDER BY `SK_ID_BUREAU`;
""")
df_temp_03.createOrReplaceTempView("df_temp_02")
df_temp_03.show(5)

                                                                                

+------------+----------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+---------------+------------------+-----------+----------------------------+-----------------------------+-----------------------------+-----------------------------+----------------------------+-----------------------------+-----------------------------+-----------------------------+----------------------------+-----------------------------+-----------------------------+-----------------------------+----------------------------+-----------------------------+-----------------------------+-----------------------------+----------------------------+-----------------------------+-----------------------------+-----------------------------+----------------------------+-----------------------------+-----------------------------+------------------------

## Criando váriaveis de primeira camada

In [14]:
from pyspark.sql.functions import col, round, sum, avg, max, min, when

# Função para gerar expressões de agregação com flags
def gerar_expressoes_agregacao_com_flags(colunas_agregacao_total, colunas_flags):
    expressoes_agregacao = []

    for flag in colunas_flags:
        for coluna in colunas_agregacao_total:
            if 'DAY' in coluna:
                expressoes_agregacao.append(round(max(when(col(flag) == 1, col(coluna))), 2).alias(f"QT_MAX_{coluna.upper()}_{flag.upper()}_BUREAU"))
                expressoes_agregacao.append(round(min(when(col(flag) == 1, col(coluna))), 2).alias(f"QT_MIN_{coluna.upper()}_{flag.upper()}_BUREAU"))
            else:
                expressoes_agregacao.append(round(sum(when(col(flag) == 1, col(coluna))), 2).alias(f"VL_TOT_{coluna.upper()}_{flag.upper()}_BUREAU"))
                expressoes_agregacao.append(round(avg(when(col(flag) == 1, col(coluna))), 2).alias(f"VL_MED_{coluna.upper()}_{flag.upper()}_BUREAU"))

    return expressoes_agregacao

# Função para gerar expressões de agregação sem flags, apenas com categorias
def gerar_expressoes_agregacao_sem_flags(colunas_agregacao_total, colunas_cat):
    expressoes_agregacao = []

    for cat in colunas_cat:
        for coluna in colunas_agregacao_total:
            if 'DAY' in coluna:
                expressoes_agregacao.append(round(max(when(col(cat) == 1, col(coluna))), 2).alias(f"QT_MAX_{coluna.upper()}_{cat.upper()}_BUREAU"))
                expressoes_agregacao.append(round(min(when(col(cat) == 1, col(coluna))), 2).alias(f"QT_MIN_{coluna.upper()}_{cat.upper()}_BUREAU"))
            else:
                expressoes_agregacao.append(round(sum(when(col(cat) == 1, col(coluna))), 2).alias(f"VL_TOT_{coluna.upper()}_{cat.upper()}_BUREAU"))
                expressoes_agregacao.append(round(avg(when(col(cat) == 1, col(coluna))), 2).alias(f"VL_MED_{coluna.upper()}_{cat.upper()}_BUREAU"))

    return expressoes_agregacao

# Definir as colunas para agregação
colunas_agregacao_total = ['CREDIT_DAY_OVERDUE', 'DAYS_CREDIT_ENDDATE', 'DAYS_ENDDATE_FACT', 'AMT_CREDIT_MAX_OVERDUE',
                           'CNT_CREDIT_PROLONG', 'AMT_CREDIT_SUM', 'AMT_CREDIT_SUM_DEBT',
                           'AMT_CREDIT_SUM_LIMIT', 'AMT_CREDIT_SUM_OVERDUE', 'DAYS_CREDIT_UPDATE', 'AMT_ANNUITY']

colunas_flags = ['U3M','U6M', 'U9M', 'U12M']
colunas_cat1 = ['CRED_ACT_CLD', 'CRED_ACT_ACT']
colunas_cat2 = ['CRED_CURR_1']

# Gerar expressões de agregação para flags
expressoes_agregacao_flags = gerar_expressoes_agregacao_com_flags(colunas_agregacao_total, colunas_flags)

# Gerar expressões de agregação para categorias (sem flags)
expressoes_agregacao_cat1 = gerar_expressoes_agregacao_sem_flags(colunas_agregacao_total, colunas_cat1)
expressoes_agregacao_cat2 = gerar_expressoes_agregacao_sem_flags(colunas_agregacao_total, colunas_cat2)

# Combinar todas as expressões de agregação
expressoes_agregacao = expressoes_agregacao_flags + expressoes_agregacao_cat1 + expressoes_agregacao_cat2

# Aplicar as expressões de agregação
df_temp_04 = df_temp_03.groupBy("SK_ID_CURR").agg(*expressoes_agregacao).orderBy("SK_ID_CURR")


# Quantidade e nome das variáveis criadas.
nomes_cols = df_temp_04.columns
nomes_cols_novas = nomes_cols[1:]
print('Quantidade Total de Variáveis Criadas:', len(df_temp_04.columns) - 1)
print('Nomes das Variáveis Criadas:', nomes_cols_novas)
print('')
print('')

# Quantidade de linhas do DataFrame.
num_rows_df = df_temp_04.count()

# Quantidade de colunas do DataFrame.
num_columns_df = len(df_temp_04.columns)

# Imprimir o resultado de número de linhas e colunas.
print(f'Quantidade de linhas do DataFrame: {num_rows_df}')
print(f'Quantidade de colunas do DataFrame: {num_columns_df}')
print('')
print('')

Quantidade Total de Variáveis Criadas: 154
Nomes das Variáveis Criadas: ['QT_MAX_CREDIT_DAY_OVERDUE_U3M_BUREAU', 'QT_MIN_CREDIT_DAY_OVERDUE_U3M_BUREAU', 'QT_MAX_DAYS_CREDIT_ENDDATE_U3M_BUREAU', 'QT_MIN_DAYS_CREDIT_ENDDATE_U3M_BUREAU', 'QT_MAX_DAYS_ENDDATE_FACT_U3M_BUREAU', 'QT_MIN_DAYS_ENDDATE_FACT_U3M_BUREAU', 'VL_TOT_AMT_CREDIT_MAX_OVERDUE_U3M_BUREAU', 'VL_MED_AMT_CREDIT_MAX_OVERDUE_U3M_BUREAU', 'VL_TOT_CNT_CREDIT_PROLONG_U3M_BUREAU', 'VL_MED_CNT_CREDIT_PROLONG_U3M_BUREAU', 'VL_TOT_AMT_CREDIT_SUM_U3M_BUREAU', 'VL_MED_AMT_CREDIT_SUM_U3M_BUREAU', 'VL_TOT_AMT_CREDIT_SUM_DEBT_U3M_BUREAU', 'VL_MED_AMT_CREDIT_SUM_DEBT_U3M_BUREAU', 'VL_TOT_AMT_CREDIT_SUM_LIMIT_U3M_BUREAU', 'VL_MED_AMT_CREDIT_SUM_LIMIT_U3M_BUREAU', 'VL_TOT_AMT_CREDIT_SUM_OVERDUE_U3M_BUREAU', 'VL_MED_AMT_CREDIT_SUM_OVERDUE_U3M_BUREAU', 'QT_MAX_DAYS_CREDIT_UPDATE_U3M_BUREAU', 'QT_MIN_DAYS_CREDIT_UPDATE_U3M_BUREAU', 'VL_TOT_AMT_ANNUITY_U3M_BUREAU', 'VL_MED_AMT_ANNUITY_U3M_BUREAU', 'QT_MAX_CREDIT_DAY_OVERDUE_U6M_BUREAU', 'QT_MIN



Quantidade de linhas do DataFrame: 305811
Quantidade de colunas do DataFrame: 155




                                                                                

In [15]:
df_temp_04.write.mode("overwrite").parquet('/data/books/bureau')


                                                                                

In [16]:
spark.stop()