## Manipulação e Tratamento de Dados - Camada Silver

Camada de dados tratada e normalizada, confíavel para uso.

In [1]:

# IMPORTS AND LIBRARIES
import os
import boto3
from botocore.exceptions import ClientError
from datetime import datetime
import logging

# Configuração do logger
logger = logging.getLogger("minio_logger")
logger.setLevel(logging.INFO)


# Configurando o formato do log
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)


# PySpark Libraries
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, explode, lit, from_json

from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType


In [2]:
# Variáveis Globais e de Ambiente para o Projeto.

os.environ["MINIO_KEY"] = "developer"
os.environ["MINIO_SECRET"] = "developer01"
os.environ["MINIO_ENDPOINT"] = "http://minio:9000"


# Na ausência do dbtuils ou do Metastore vou usar o boto para me ajudar
# a iteragir com o storage
s3_client = boto3.client(
    's3',
    endpoint_url = os.environ.get("MINIO_ENDPOINT"),
    aws_access_key_id = os.environ.get("MINIO_KEY"),
    aws_secret_access_key = os.environ.get("MINIO_SECRET")
)

bucket_name = "bank-databr"

# Paths Data Storage
root_path_dir = f"{bucket_name}"
landing_path_dir = f"{root_path_dir}/landing/bacen"
bronze_path_dir = f"{root_path_dir}/bronze"
silver_path_dir = f"{root_path_dir}/silver"

# Partição da tabela Bronze, Data de referência
dt_partition = datetime.now().strftime("%Y-%m-%d")

In [3]:

spark = SparkSession.builder \
                    .appName("SilverLayer") \
                    .config("spark.hadoop.fs.s3a.endpoint", os.environ["MINIO_ENDPOINT"]) \
                    .config("spark.hadoop.fs.s3a.access.key", os.environ["MINIO_KEY"]) \
                    .config("spark.hadoop.fs.s3a.secret.key", os.environ["MINIO_SECRET"]) \
                    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
                    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
                    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
                    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
                    .getOrCreate()


/opt/spark/bin/load-spark-env.sh: line 68: ps: command not found


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
software.amazon.awssdk#s3 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c3be89e5-3da9-494e-8616-cb8cc16109e3;1.0
	confs: [default]
	found software.amazon.awssdk#s3;2.26.30 in central
	found software.amazon.awssdk#aws-xml-protocol;2.26.30 in central
	found software.amazon.awssdk#aws-query-protocol;2.26.30 in central
	found software.amazon.awssdk#protocol-core;2.26.30 in central
	found software.amazon.awssdk#sdk-core;2.26.30 in central
	found software.amazon.awssdk#annotations;2.26.30 in central
	found software.amazon.awssdk#http-client-spi;2.26.30 in central
	found software.amazon.awssdk#utils;2.26.30 in central
	found org.reactivestreams#reactive-streams;1.0.4 in central
	found org.slf4j#slf4j-api;1.7.36 in central
	found software.amazon.awssdk#metric

### Funções Utilitárias - Utils

In [4]:

def check_table_exists_in_metastore(schema_name:str, table_name: str) -> bool:
    """ Verifica se a tabela já existe no schema indicado dentro do Catalógo """

    return spark._jsparkSession.catalog() \
                               .tableExists(f"{schema_name}.{table_name}")


def check_table_exists_by_location(s3_minio_client:boto3.client, destionation_table_path: str) -> bool:
    """ Verifica se a tabela já existe, usando o gerenciador do storage """

    # Vou adaptar usando o boto3 já que não tenho por aqui nem o dbutils nem um metastore para checkar via tableExists()
    
    # Extrai o bucket e o prefixo (path)
    bucket_name, path = destionation_table_path.replace("s3a://", "").split("/", 1)
    
    try:
        # Verifica se o diretório existe no S3 (se o prefixo da pasta tiver objetos)
        response = s3_minio_client.list_objects_v2(Bucket=bucket_name, Prefix=path, MaxKeys=1)
        return 'Contents' in response
        
    except ClientError as e:
        return False

def write_delta_table_by_location(df_input: DataFrame, destionation_table_path: str) -> None:
    """ Escreve a tabela Delta no Storage de forma particionada
        Se a tabela já existir realiza upsert na partição indicada
        Caso não exista primeiramente cria a tabela no modo 'append'

        Args:
            df_input: dataframe a ser gravado no storage
            destionation_table_path: local de destino no storage
    """

    if check_table_exists_by_location(s3_client, destionation_table_path): 

        print(f"Tabela já existe: escrevendo nova partição em:\n\t*{destionation_table_path}")

        df_input.write \
                .format("delta") \
                .mode("append") \
                .option("replaceWhere", f"partition = {dt_partition}") \
                .partitionBy("dt_partition") \
                .save(destionation_table_path)   
    else:
        
        print(f"Tabela ainda não existe, criando nova tabela em: \n\t*{destionation_table_path}")

        #TODO: atualizar para considerar o EvolutionSchema, option("mergeSchema": True)
        # No formato atual estou considerando somente como EnforceSchema.

        df_input.write \
                .format("delta") \
                .mode("append") \
                .partitionBy("dt_partition") \
                .save(destionation_table_path) 

   

### 01. Pagamentos  Trimestrais


Leitura e tratamento da tabela Bronze. Aqui vamos aplicar algumas transformações e tratamentos também vamos definir um schema final desejado para a tabela na silva.

In [5]:
# Lendo tabela de Pagamentos Trimestrais da Bronze

table_name = "b_pagamentos_trimestrais_bc"
dt_ref_carga= "2025-01-20"
# Fixando a partição a ser lida, se fosse algo produtivo, e via orquestrador como Control-M ou Airflow
# seria uma variável de ambiente com este valor

data_source_file_path = f"s3a://{bronze_path_dir}/{table_name}"
print(f"* Data Source Pagamentos Trimestrais: {data_source_file_path}")


df_pagamentos_trimestral_bronze = spark.read \
                                       .format("delta") \
                                       .load(data_source_file_path) \
                                       .where(col('dt_partition') == dt_ref_carga)


print("\n* Schema Original do arquivo origem")
df_pagamentos_trimestral_bronze.printSchema
df_pagamentos_trimestral_bronze.show(n=1, vertical=True, truncate=True)


* Data Source Pagamentos Trimestrais: s3a://bank-databr/bronze/b_pagamentos_trimestrais_bc


25/01/30 23:20:04 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.



* Schema Original do arquivo origem


                                                                                

-RECORD 0--------------------------------------
 @odata.context         | https://was-p.bcn... 
 value                  | [{2024-09-30, 152... 
 file_path              | s3a://bank-databr... 
 file_name              | data_18_01_2025_1... 
 file_size              | 19142                
 file_block_start       | 0                    
 file_block_length      | 19142                
 file_modification_time | 2025-01-18 20:40:39  
 ingestion_engine       | python_dlt           
 dt_partition           | 2025-01-20           
only showing top 1 row



#### Aplicando Transformações e Schema Final

In [6]:
# Vamos transformar a tabela original, explodindo os campos JSON/Struct para novas colunas e filtrando algumas variáveis

# Explodindo o valor da coluna "value" em múltiplas colunas
df_pagamentos_trimestral_transformed = df_pagamentos_trimestral_bronze.withColumn('value_struct', explode(col("value"))) \
                                                                      .select( col("value_struct.*") ) \
                                                                      .drop(*['@odata.context', 'value']) \
                                                                      .withColumn('dt_partition', lit(dt_ref_carga) )




# Schema Original do arquivo origem
df_pagamentos_trimestral_transformed.printSchema()
df_pagamentos_trimestral_transformed.show(n=3, vertical=False, truncate=True)


root
 |-- datatrimestre: string (nullable = true)
 |-- quantidadeBoleto: double (nullable = true)
 |-- quantidadeCartaoCredito: double (nullable = true)
 |-- quantidadeCartaoDebito: double (nullable = true)
 |-- quantidadeCartaoPrePago: double (nullable = true)
 |-- quantidadeCheque: double (nullable = true)
 |-- quantidadeConvenios: double (nullable = true)
 |-- quantidadeDOC: double (nullable = true)
 |-- quantidadeDebitoDireto: double (nullable = true)
 |-- quantidadePix: double (nullable = true)
 |-- quantidadeSaques: double (nullable = true)
 |-- quantidadeTEC: double (nullable = true)
 |-- quantidadeTED: double (nullable = true)
 |-- quantidadeTransIntrabancaria: double (nullable = true)
 |-- valorBoleto: double (nullable = true)
 |-- valorCartaoCredito: double (nullable = true)
 |-- valorCartaoDebito: double (nullable = true)
 |-- valorCartaoPrePago: double (nullable = true)
 |-- valorCheque: double (nullable = true)
 |-- valorConvenios: double (nullable = true)
 |-- valorDOC: d

                                                                                

+-------------+----------------+-----------------------+----------------------+-----------------------+----------------+-------------------+-------------+----------------------+-------------+----------------+-------------+-------------+----------------------------+-----------+------------------+-----------------+------------------+-----------+--------------+--------+-----------------+----------+-----------+--------+-------------+-----------------------+------------+
|datatrimestre|quantidadeBoleto|quantidadeCartaoCredito|quantidadeCartaoDebito|quantidadeCartaoPrePago|quantidadeCheque|quantidadeConvenios|quantidadeDOC|quantidadeDebitoDireto|quantidadePix|quantidadeSaques|quantidadeTEC|quantidadeTED|quantidadeTransIntrabancaria|valorBoleto|valorCartaoCredito|valorCartaoDebito|valorCartaoPrePago|valorCheque|valorConvenios|valorDOC|valorDebitoDireto|  valorPix|valorSaques|valorTEC|     valorTED|valorTransIntrabancaria|dt_partition|
+-------------+----------------+-----------------------+--

#### Data Quality

Vamos verificar o schema inferido validando os tipos dos dados e valores carregados



In [7]:

from pyspark.sql.functions import count as _count, when

def resumo_estatistico(df: DataFrame) -> DataFrame:
    """ Gera um resumo com metadados e estatíticas básicas sobre o DataFrame """

    # Recuperando o Nome e tipos de cada Coluna
    column_types = df.dtypes

    # Recupera a quantidade de valores Nulos/Empty/Missing para cada coluna

    null_counts = df.select([
                                _count(
                                    when(col(column_name).isNull(), column_name)
                                ).alias(column_name)
                                for column_name in df.columns
                            ]).collect()[0]
    
    # Montando o resumo final
    resumo = [
        (column_name, dtype, null_counts[column_name])
        for column_name, dtype in column_types
    ]

    # Dataframe resposta
    schema = ["ColumnName", "DataType", "CountMissings"]

    return spark.createDataFrame(data = resumo, schema = schema)




df_statistics = resumo_estatistico(df_pagamentos_trimestral_transformed)

df_statistics.show(truncate=False)

                                                                                

+----------------------------+--------+-------------+
|ColumnName                  |DataType|CountMissings|
+----------------------------+--------+-------------+
|datatrimestre               |string  |0            |
|quantidadeBoleto            |double  |0            |
|quantidadeCartaoCredito     |double  |0            |
|quantidadeCartaoDebito      |double  |0            |
|quantidadeCartaoPrePago     |double  |0            |
|quantidadeCheque            |double  |0            |
|quantidadeConvenios         |double  |0            |
|quantidadeDOC               |double  |0            |
|quantidadeDebitoDireto      |double  |0            |
|quantidadePix               |double  |0            |
|quantidadeSaques            |double  |0            |
|quantidadeTEC               |double  |0            |
|quantidadeTED               |double  |0            |
|quantidadeTransIntrabancaria|double  |0            |
|valorBoleto                 |double  |0            |
|valorCartaoCredito         

                                                                                

#### Data Quality com Great Expectations

Vamos usar o Great Expectations para de forma padronizada, validar os intervaloes e valores das coluns em nosso Dataframe

1. Check for duplicates
2. Check for unique values in columns
3. Check for missing values
4. Categorical value distributions
5. Schema validation
6. Temporal consistency check
7. Cross-field validation
8. Dependency check


In [8]:

import great_expectations as gx
import great_expectations.expectations as gxe


In [10]:
# 01. Criando o contexto
context = gx.get_context()


# Configurando fonte de dados
datasource = context.data_sources.add_spark(name = "pagamentos_trimestrais")

In [11]:
# Configurando asset
data_asset = datasource.add_dataframe_asset(name = "pagamentos_trimestrais_asset")

In [12]:
# Criando Definições Batch

batch_definition = data_asset.add_batch_definition_whole_dataframe("batch_pagamentos_trimestrais")

In [13]:
# Adicionando dataframe como parâmetro

batch_parameters = {"dataframe": df_pagamentos_trimestral_transformed}

batch = batch_definition.get_batch(batch_parameters= batch_parameters)

In [14]:
expectation = gx.expectations.ExpectColumnValuesToBeBetween(
    column = "quantidadeBoleto",
    min_value = 1,
    max_value = 100
)


validation_result = batch.validate(expectation)

Calculating Metrics: 100%|██████████| 13/13 [00:02<00:00,  4.85it/s]            


In [15]:
print(validation_result)

{
  "success": false,
  "expectation_config": {
    "type": "expect_column_values_to_be_between",
    "kwargs": {
      "batch_id": "pagamentos_trimestrais-pagamentos_trimestrais_asset",
      "column": "quantidadeBoleto",
      "min_value": 1.0,
      "max_value": 100.0
    },
    "meta": {}
  },
  "result": {
    "element_count": 161,
    "unexpected_count": 161,
    "unexpected_percent": 100.0,
    "partial_unexpected_list": [
      1525449.9,
      1439119.22,
      1422709.26,
      1452479.44,
      1487093.97,
      1494217.09,
      1510864.63,
      1522483.73,
      1538367.11,
      1517092.79,
      1490934.41,
      1497704.81,
      1493638.16,
      1431427.43,
      1386080.6,
      1472868.82,
      1389215.55,
      1256812.05,
      1184510.67,
      1190397.8
    ],
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_percent_total": 100.0,
    "unexpected_percent_nonmissing": 100.0,
    "partial_unexpected_counts": [
      {
        "value": 1184510.