Importação das Bibliotecas Necessárias

In [1]:
from pyspark.sql.functions import col, when
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import os

Configuração do SparkSession

In [2]:
import os
from pyspark.sql import SparkSession

# Configurando o IP local para evitar o aviso de loopback
#os.environ['SPARK_LOCAL_IP'] = '172.30.205.65'

# Iniciando a SparkSession com nível de log definido para ERROR para reduzir avisos
spark = SparkSession.builder \
    .appName("CreditRiskETL") \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")  # Ajustando para mostrar apenas erros críticos


24/11/01 22:02:50 WARN Utils: Your hostname, SABRLDXV9K03 resolves to a loopback address: 127.0.1.1; using 172.30.205.65 instead (on interface eth0)
24/11/01 22:02:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/01 22:02:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/11/01 22:02:51 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


Definição do Diretório Base e Criação das Pastas

In [3]:
# Obter o diretório de trabalho atual
current_dir = os.getcwd()

# Obter o diretório raiz do projeto (subindo um nível)
project_root = os.path.dirname(current_dir)

# Definição dos diretórios das camadas do ETL
base_dir = os.path.join(current_dir, 'etl_camadas')

def create_dir(directory):
    if not os.path.exists(directory):
        os.makedirs(directory)


Camada Bronze: Carregamento dos Dados Brutos

In [4]:
# Definir os caminhos para cada camada
bronze_dir = os.path.join(base_dir, 'bronze')
silver_dir = os.path.join(base_dir, 'silver')
gold_dir = os.path.join(base_dir, 'gold')

# Criando os diretórios
create_dir(bronze_dir)
create_dir(silver_dir)
create_dir(gold_dir)

# Caminho para o arquivo CSV original
raw_data_path = os.path.join(project_root, 'dataset', 'credit_risk_dataset.csv')

# Verificando se o arquivo existe
if not os.path.exists(raw_data_path):
    print(f"Arquivo CSV não encontrado: {raw_data_path}")
else:
    print(f"Carregando dados de: {raw_data_path}")
    # Carregando os dados brutos com PySpark
    df_bronze = spark.read.csv(raw_data_path, header=True, inferSchema=True)
    print("\nDados carregados com sucesso. Exibindo as primeiras linhas:")
    df_bronze.show(5)  # Mostra as primeiras 5 linhas

    # Exibindo o esquema do DataFrame
    print("\nEsquema do DataFrame:")
    df_bronze.printSchema()

    # Salvando a camada bronze
    bronze_file_path = os.path.join(bronze_dir, 'credit_risk_bronze.parquet')
    try:
        df_bronze.write.mode('overwrite').parquet(bronze_file_path)
        print(f"\nCamada bronze salva com sucesso em: {bronze_file_path}")
    except Exception as e:
        print(f"\nOcorreu um erro ao salvar a camada bronze: {e}")

    # Verificando se o diretório Parquet foi criado
    if os.path.isdir(bronze_file_path):
        print(f"O arquivo Parquet foi criado com sucesso em: {bronze_file_path}")
    else:
        print("O arquivo Parquet não foi encontrado após a tentativa de salvamento.")

    # Opcional: Carregando e exibindo os dados salvos
    print("\nVerificando os dados salvos na camada bronze:")
    df_bronze_saved = spark.read.parquet(bronze_file_path)
    df_bronze_saved.show(5)


Carregando dados de: /home/savidotti/programmers/credit-risk-analysis/dataset/credit_risk_dataset.csv

Dados carregados com sucesso. Exibindo as primeiras linhas:
+----------+-------------+---------------------+-----------------+-----------+----------+---------+-------------+-----------+-------------------+-------------------------+--------------------------+
|person_age|person_income|person_home_ownership|person_emp_length|loan_intent|loan_grade|loan_amnt|loan_int_rate|loan_status|loan_percent_income|cb_person_default_on_file|cb_person_cred_hist_length|
+----------+-------------+---------------------+-----------------+-----------+----------+---------+-------------+-----------+-------------------+-------------------------+--------------------------+
|        22|        59000|                 RENT|            123.0|   PERSONAL|         D|    35000|        16.02|          1|               0.59|                        Y|                         3|
|        21|         9600|               

                                                                                


Camada bronze salva com sucesso em: /home/savidotti/programmers/credit-risk-analysis/pipeline_etl/etl_camadas/bronze/credit_risk_bronze.parquet
O arquivo Parquet foi criado com sucesso em: /home/savidotti/programmers/credit-risk-analysis/pipeline_etl/etl_camadas/bronze/credit_risk_bronze.parquet

Verificando os dados salvos na camada bronze:
+----------+-------------+---------------------+-----------------+-----------+----------+---------+-------------+-----------+-------------------+-------------------------+--------------------------+
|person_age|person_income|person_home_ownership|person_emp_length|loan_intent|loan_grade|loan_amnt|loan_int_rate|loan_status|loan_percent_income|cb_person_default_on_file|cb_person_cred_hist_length|
+----------+-------------+---------------------+-----------------+-----------+----------+---------+-------------+-----------+-------------------+-------------------------+--------------------------+
|        22|        59000|                 RENT|          

Camada Silver: Limpeza e Transformação Básica

In [5]:
# Criando uma cópia dos dados brutos da camada bronze
df_silver = df_bronze

# Tratamento de valores ausentes em 'person_emp_length'
median_emp_length = df_silver.approxQuantile("person_emp_length", [0.5], 0.25)[0]
df_silver = df_silver.na.fill({"person_emp_length": median_emp_length})
print("\nTratamento de valores ausentes concluído para 'person_emp_length'.")

# Convertendo tipos de dados para garantir consistência
df_silver = df_silver \
    .withColumn("person_age", col("person_age").cast(IntegerType())) \
    .withColumn("person_income", col("person_income").cast(DoubleType())) \
    .withColumn("loan_amnt", col("loan_amnt").cast(DoubleType())) \
    .withColumn("loan_int_rate", col("loan_int_rate").cast(DoubleType())) \
    .withColumn("loan_status", col("loan_status").cast(IntegerType())) \
    .withColumn("loan_percent_income", col("loan_percent_income").cast(DoubleType())) \
    .withColumn("cb_person_cred_hist_length", col("cb_person_cred_hist_length").cast(IntegerType()))
print("\nConversão de tipos de dados concluída.")

# Exibindo as primeiras linhas da camada silver e o esquema do DataFrame
print("\nExibindo as primeiras linhas da camada silver:")
df_silver.show(5)
print("\nEsquema do DataFrame da camada silver:")
df_silver.printSchema()

# Salvando a camada silver
silver_file_path = os.path.join(silver_dir, 'credit_risk_silver.parquet')
try:
    df_silver.write.mode('overwrite').parquet(silver_file_path)
    print(f"\nCamada silver salva com sucesso em: {silver_file_path}")
except Exception as e:
    print(f"\nOcorreu um erro ao salvar a camada silver: {e}")

# Verificando se o diretório Parquet foi criado
if os.path.isdir(silver_file_path):
    print(f"O arquivo Parquet foi criado com sucesso em: {silver_file_path}")
else:
    print("O arquivo Parquet não foi encontrado após a tentativa de salvamento.")

# Carregando e exibindo os dados salvos na camada silver para verificação
print("\nVerificando os dados salvos na camada silver:")
df_silver_saved = spark.read.parquet(silver_file_path)
df_silver_saved.show(5)



Tratamento de valores ausentes concluído para 'person_emp_length'.

Conversão de tipos de dados concluída.

Exibindo as primeiras linhas da camada silver:
+----------+-------------+---------------------+-----------------+-----------+----------+---------+-------------+-----------+-------------------+-------------------------+--------------------------+
|person_age|person_income|person_home_ownership|person_emp_length|loan_intent|loan_grade|loan_amnt|loan_int_rate|loan_status|loan_percent_income|cb_person_default_on_file|cb_person_cred_hist_length|
+----------+-------------+---------------------+-----------------+-----------+----------+---------+-------------+-----------+-------------------+-------------------------+--------------------------+
|        22|      59000.0|                 RENT|            123.0|   PERSONAL|         D|  35000.0|        16.02|          1|               0.59|                        Y|                         3|
|        21|       9600.0|                  OWN|

Camada Gold: Enriquecimento e Preparação para ML

In [9]:
# Inicializando a SparkSession
spark = SparkSession.builder.appName("CreditRiskETL").getOrCreate()

# Definindo os caminhos para as camadas silver e gold
project_root = os.getcwd()
silver_dir = os.path.join(project_root, 'etl_camadas', 'silver')
gold_dir = os.path.join(project_root, 'etl_camadas', 'gold')
silver_file_path = os.path.join(silver_dir, 'credit_risk_silver.parquet')
gold_file_path = os.path.join(gold_dir, 'credit_risk_gold.parquet')

# Verificação da camada silver
if not os.path.exists(silver_file_path):
    print(f"Arquivo silver não encontrado: {silver_file_path}")
else:
    df_silver = spark.read.parquet(silver_file_path)
    print("\nArquivo silver carregado com sucesso.")

    # Criando uma cópia dos dados da camada silver para a camada gold
    df_gold = df_silver

    # Tratamento de valores nulos em 'person_home_ownership'
    df_gold = df_gold.na.fill({"person_home_ownership": "OTHER"})
    print("\nTratamento de valores nulos concluído para 'person_home_ownership'.")

    # Criar uma variável binária para indicar se a pessoa possui casa própria
    df_gold = df_gold.withColumn(
        'is_home_owner',
        when(col('person_home_ownership').isin('OWN', 'MORTGAGE'), 1).otherwise(0)
    )
    print("\nVariável 'is_home_owner' criada com sucesso.")

    # Garantindo que todas as colunas de features estejam sem nulos
    df_gold = df_gold.na.fill({
        "person_age": 0, 
        "person_income": 0.0,
        "person_emp_length": 0.0,
        "loan_amnt": 0.0,
        "loan_int_rate": 0.0,
        "loan_percent_income": 0.0,
        "cb_person_cred_hist_length": 0,
        "is_home_owner": 0
    })
    print("\nValores nulos nas colunas numéricas foram preenchidos com valores padrão.")

    # Codificação de variáveis categóricas usando StringIndexer e OneHotEncoder
    categorical_vars = ['loan_intent', 'loan_grade', 'cb_person_default_on_file']
    indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_index", handleInvalid='keep') for col in categorical_vars]
    encoders = [OneHotEncoder(inputCol=f"{col}_index", outputCol=f"{col}_vec") for col in categorical_vars]

    # Aplicando o Pipeline
    pipeline = Pipeline(stages=indexers + encoders)
    df_gold = pipeline.fit(df_gold).transform(df_gold)
    print("\nCodificação de variáveis categóricas concluída.")

    # Criando o vetor de features
    assembler_inputs = [
        'person_age',
        'person_income',
        'person_emp_length',
        'loan_amnt',
        'loan_int_rate',
        'loan_percent_income',
        'cb_person_cred_hist_length',
        'is_home_owner'
    ] + [encoder.getOutputCol() for encoder in encoders]

    assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features", handleInvalid="keep")
    df_gold = assembler.transform(df_gold)
    print("\nVetor de features criado com sucesso.")

    # Selecionando as colunas necessárias para a camada gold, incluindo as colunas originais e de features
    df_gold = df_gold.select(
        'person_age', 
        'person_income', 
        'person_emp_length', 
        'loan_amnt', 
        'loan_int_rate', 
        'loan_percent_income', 
        'cb_person_cred_hist_length', 
        'is_home_owner', 
        'features', 
        col('loan_status').alias('label')
    )

    # Exibindo as primeiras linhas da camada gold e o esquema do DataFrame
    print("\nExibindo as primeiras linhas da camada gold:")
    df_gold.show(5)
    print("\nEsquema do DataFrame da camada gold:")
    df_gold.printSchema()

    # Salvando a camada gold
    os.makedirs(gold_dir, exist_ok=True)
    try:
        df_gold.write.mode('overwrite').parquet(gold_file_path)
        print(f"\nCamada gold salva com sucesso em: {gold_file_path}")
    except Exception as e:
        print(f"\nOcorreu um erro ao salvar a camada gold: {e}")

    # Verificando se o diretório Parquet foi criado
    if os.path.isdir(gold_file_path):
        print(f"O arquivo Parquet foi criado com sucesso em: {gold_file_path}")
    else:
        print("O arquivo Parquet não foi encontrado após a tentativa de salvamento.")

    # Carregando e exibindo os dados salvos na camada gold para verificação
    print("\nVerificando os dados salvos na camada gold:")
    df_gold_saved = spark.read.parquet(gold_file_path)
    df_gold_saved.show(5)

    # Verificação final para garantir que 'loan_int_rate' está presente na camada gold
    if 'loan_int_rate' in df_silver.columns:
        print("\nColuna 'loan_int_rate' confirmada na camada silver e propagada para a camada gold.")
    else:
        print("\nA coluna 'loan_int_rate' não foi encontrada na camada silver; revise o pipeline ETL.")

# Finalizando a SparkSession
spark.stop()



Arquivo silver carregado com sucesso.

Tratamento de valores nulos concluído para 'person_home_ownership'.

Variável 'is_home_owner' criada com sucesso.

Valores nulos nas colunas numéricas foram preenchidos com valores padrão.

Codificação de variáveis categóricas concluída.

Vetor de features criado com sucesso.

Exibindo as primeiras linhas da camada gold:
+----------+-------------+-----------------+---------+-------------+-------------------+--------------------------+-------------+--------------------+-----+
|person_age|person_income|person_emp_length|loan_amnt|loan_int_rate|loan_percent_income|cb_person_cred_hist_length|is_home_owner|            features|label|
+----------+-------------+-----------------+---------+-------------+-------------------+--------------------------+-------------+--------------------+-----+
|        22|      59000.0|            123.0|  35000.0|        16.02|               0.59|                         3|            0|(23,[0,1,2,3,4,5,...|    1|
|        