##Template de Ingestão

##Pré-requisitos:

- Criar cluster com lib tipo PyPI: google-cloud-bigquery.
- Executar notebook para inserir "service account key file".
- Executar notebook de criação de tabelas delta.

In [0]:
from pyspark.sql.functions import col, sum, expr, count, row_number, lit, input_file_name
from pyspark.sql import DataFrame
from pyspark.sql.functions import col as spark_col, sum as spark_sum
from pyspark.sql.window import Window
from datetime import datetime
import logging
import os
from google.cloud import bigquery

In [0]:
#Instancia logger
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()

#Atribuir variável de embiente
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/tmp/your_key.json"

In [0]:
#Funções Compartilhadas (Utilitárias)

#Cria conexao com Storage GCP
def conex_gcp():
    try:
        service_account_key_file = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")

        # Configurar as credenciais para acessar o Google Cloud Storage
        spark.conf.set("fs.gs.auth.service.account.enable", "true")
        spark.conf.set("google.cloud.auth.service.account.json.keyfile", service_account_key_file)

        # Exemplo de leitura de dados do GCS
        bucket_name = "lake_data_master"

        return logger.info("Connection successfully")
    
    except Exception as e:
        return logger.error("Connection failed")
    

#Criar conexão com Big Query
def insert_bigquery(dict_metrics):
    try:
        key_file = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
        
        logger.info("Opening connection with Big Query")
        try:
            client = bigquery.Client()
            logger.info("Connection successfully")
        except Exception as e:
            logger.error("Connection failed")

        table_id = "datamaster01.ingestion_metrics_data_master.ingestion_metrics_data_lake"

        logger.info(f"Inserting data into table Id: {table_id}")
        errors = client.insert_rows_json(table_id, dict_metrics)

        if errors == []:
            return logger.info("Dados inseridos com sucesso.")
        else:
            return logger.info(f"Erros ao inserir dados: {errors}")
    
    except Exception as e:
        return logger.error("Connection failed")    


#Realiza o load dos dados no path raiz
def load_data_ingestion(path, header, sep):
    try:
        df = spark.read.format("csv").option("header", header).option("sep", sep).load(path)

        if len(df.columns) > 1:

            current_date = datetime.now()
            dat_carga = current_date.strftime("%Y%m%d")
            df_dat = df.withColumn("dat_ref_carga", lit(dat_carga))

            logger.info("Data loaded successfully")

            return df_dat
        
        else:
            # Lançar uma exceção para indicar que a condição não foi atendida
            error_message = "The DataFrame does not have more than one column. Check the separator used to read the file."
            df.show(1)
            logger.error(error_message)
            raise ValueError(error_message)

    except ValueError as e:
        return logger.error(f"Data load failure: {e}")
    
#Verificar dados nulos
def check_nulls(df: DataFrame, required_columns: list):
    # Verificar se todas as colunas obrigatórias estão presentes no DataFrame
    df_columns = set(df.columns)
    missing_columns = [col for col in required_columns if col not in df_columns]

    if missing_columns:
        raise ValueError(f"The following required columns are missing from the DataFrame: {', '.join(missing_columns)}")

    try:
        # Calcular a contagem de valores nulos por coluna
        null_counts = df.select([spark_sum(spark_col(c).isNull().cast("int")).alias(c) for c in df.columns])

        # Convertendo o resultado para um dicionário
        null_counts_dict = null_counts.collect()[0].asDict()

        # Exibindo colunas com valores nulos e suas respectivas contagens
        nulls_info = {column: count for column, count in null_counts_dict.items() if count > 0}

        # Verificar se há valores nulos nas colunas que não podem conter nulos
        invalid_columns = {col: nulls_info[col] for col in required_columns if col in nulls_info and nulls_info[col] > 0}

        # Se houver colunas obrigatórias com nulos, lançar exceção
        if invalid_columns:
            raise ValueError(f"Error: The following required columns contain null values: {invalid_columns}")

        # Logar outras colunas com nulos, sem lançar exceção
        if nulls_info:
            logger.info("Other columns with null values ​​(not required):")
            for column, count in nulls_info.items():
                if column not in required_columns:
                    logger.info(f"Column: {column}, Null count: {count}")
        
        logger.info("Mandatory columns are valid.")

    except ValueError as e:
        logger.error(e)
        raise


#Coletar tempo de execução em segundos
def monitor_execution_time(start_time):
    
    end_time = datetime.now()
    duration = end_time - start_time
    duration_minutes = duration.total_seconds()

    return duration_minutes 


#Limpar espacos em branco em nome de colunas     
def clean_column_names(df):
    # Obter os nomes das colunas
    column_names = df.columns
    
    # Criar um dicionário de mapeamento para renomear as colunas
    new_column_names = {name: name.strip() for name in column_names}
    
    # Aplicar as renomeações
    for old_name, new_name in new_column_names.items():
        if old_name != new_name:  # Verificar se o nome precisa ser alterado
            df = df.withColumnRenamed(old_name, new_name)
    
    return df


In [0]:
#Função template da ingestão
def ingestion(db_name, table_name, odate, sep, required_columns):
    try:
        #Coletar tempo inicial da execução
        start_time_total_execution = datetime.now() #métricas
        logger.info(f"Start of execution: {start_time_total_execution}")

        #Gerar conexão com Storage GCP
        conex_gcp()

        #Realiza load dos dados e inclusão do campo com data de carga
        path_load = f"gs://data-ingestion-bucket-datamaster/table_ingestion_files/{table_name}"
        logger.info(f"Starting to load data into the path {path_load}/{table_name}_{odate}")
        load_start_time = datetime.now()
        df = load_data_ingestion(f"{path_load}/{table_name}_{odate}.csv", header="true",sep=sep)
        load_total_time = monitor_execution_time(load_start_time )
        logger.info(f"Total time to load data: {load_total_time} seconds")

        # Tamanho dos dados carregados em bytes
        data_size_bytes = df.rdd.map(lambda row: len(str(row))).reduce(lambda x, y: x + y)
        data_size_mb = data_size_bytes / (1024 * 1024)
        data_size_mb_formatted = float(f"{data_size_mb:.2f}")
        logger.info(f"Size of loaded data: {data_size_mb_formatted} MB")

        #Quantidade de dados carregados
        number_lines_loaded = df.count()
        logger.info(f"Number of lines loaded {number_lines_loaded }")

        #Realizando limpeza de espacos em branco no nome das colunas
        df_write_clean = clean_column_names(df)

        #Realizar validação de campos nulos, obrigatorios e nao obrigatorios
        check_nulls(df_write_clean, required_columns)

        #Verificar existência da tabela
        try:
            if spark.catalog.tableExists(f"{db_name}.{table_name}"):
                logger.info(f"The table {db_name}.{table_name} exists.")
            else:
                logger.info(f"The table {db_name}.{table_name} does not exist.")
        except Exception as e:
            logger.error(f"An error occurred while checking the table: {e}")

        #Gravar dados na tabela
        write_start_time = datetime.now()
        logger.info(f"Writing data to the table: {table_name}")
        df_write_clean.write.format("delta") \
            .mode("append") \
            .option("mergeSchema", "true") \
            .option("parquet.file.size", "128MB") \
            .saveAsTable(f"{db_name}.{table_name}")
            
        write_total_time = monitor_execution_time(write_start_time)
        logger.info(f"Data recording execution time: {write_total_time} seconds")

        #Verificar quantidade de dados inseridos
        logger.info(f"Checking amount of data entered")
        current_date = datetime.now()
        dat_carga = current_date.strftime("%Y%m%d")
        df_verify = spark.read.format("delta").table(f"{db_name}.{table_name}").where(col("dat_ref_carga") == dat_carga)
        qtd_total_rows_insert = df_verify.count()
        num_columns_table = len(df_verify.columns)
        logger.info(f"A total of {qtd_total_rows_insert} rows and a total of {num_columns_table} columns were inserted into the table")

        #Verificar numero de arquivos gerados
        logger.info(f"Checking total generated files")
        df_with_file_name = df.withColumn("file_name", input_file_name())
        num_files = df_with_file_name.select("file_name").distinct().count()
        logger.info(f"Total files generated: {num_files}")

        #Coletar tempo final da execução
        total_execution = monitor_execution_time(start_time_total_execution )
        final_time_total_execution = datetime.now()
        logger.info(f"End of execution: {final_time_total_execution}")
        logger.info(f"Total execution time: {total_execution}")

        if number_lines_loaded == qtd_total_rows_insert:
            alerta = False
            logger.info(f"Table {table_name} ingested successfully")
            logger.info(f"No alerts regarding validation of entered quantities")
        else:
            alerta = True 
            logger.info(f"Table {table_name} ingested successfully")
            logger.warning(f"Check table ingestion, has an ALERT regarding the difference in data found on the load date")

        metricas = [{
            "table_name": table_name,
            "load_total_time": load_total_time,
            "number_lines_loaded": number_lines_loaded,
            "data_size_mb_formatted": data_size_mb_formatted,
            "write_total_time": write_total_time,
            "qtd_total_rows_insert": qtd_total_rows_insert,
            "num_columns_table": num_columns_table,
            "num_files": num_files,
            "total_execution": total_execution,
            "dat_carga": dat_carga,
            "alerta": alerta
        }]

        #Insertir dados na tabela de métricas do Big Query
        logger.info("Inserting metrics data into Big Query")
        insert_bigquery(metricas )  
        
        return metricas
    
    except Exception as e:
        return logger.error(f"Error ingesting table {table_name}: {e}")            


##Execução do Template de Ingestão

In [0]:
#Ingestião tabela clientes
#Variaveis esperadas para template de carga
table_name_clientes = "clientes"
db_name_clientes = "cadastros"
odate_clientes = "20240909"
sep = ";"
required_columns = ['nome', 'cpf'] 

#Template de ingestão e atribuição de métricas
metricas_clientes = ingestion(db_name_clientes, table_name_clientes, odate_clientes, sep, required_columns)
print(metricas_clientes)


2024-09-07 18:31:00,257 - INFO - Start of execution: 2024-09-07 18:31:00.257934
2024-09-07 18:31:00,260 - INFO - Connection successfully
2024-09-07 18:31:00,265 - INFO - Starting to load data into the path gs://data-ingestion-bucket-datamaster/table_ingestion_files/clientes/clientes_20240909
2024-09-07 18:31:02,057 - INFO - Data loaded successfully
2024-09-07 18:31:02,059 - INFO - Total time to load data: 1.79276 seconds
2024-09-07 18:31:03,223 - INFO - Size of loaded data: 1.08 MB
2024-09-07 18:31:03,931 - INFO - Number of lines loaded 2000
2024-09-07 18:31:06,880 - INFO - Mandatory columns are valid.
2024-09-07 18:31:06,940 - INFO - The table cadastros.clientes exists.
2024-09-07 18:31:06,941 - INFO - Writing data to the table: clientes
2024-09-07 18:31:20,289 - INFO - Data recording execution time: 13.347942 seconds
2024-09-07 18:31:20,292 - INFO - Checking amount of data entered
2024-09-07 18:31:24,023 - INFO - A total of 2000 rows and a total of 21 columns were inserted into the t

[{'table_name': 'clientes', 'load_total_time': 1.79276, 'number_lines_loaded': 2000, 'data_size_mb_formatted': 1.08, 'write_total_time': 13.347942, 'qtd_total_rows_insert': 2000, 'num_columns_table': 21, 'num_files': 1, 'total_execution': 25.449438, 'dat_carga': '20240907', 'alerta': False}]


In [0]:
#Ingestião tabela produtos
#Variaveis esperadas para template de carga
table_name_produtos = "produtos"
db_name_produtos = "cadastros"
odate_produtos = "20240909"
sep = ","
required_columns = ['id', 'nome', 'descricao', 'categoria'] 

#Template de ingestão e atribuição de métricas
metricas_produtos = ingestion(db_name_produtos, table_name_produtos, odate_produtos, sep, required_columns)
print(metricas_produtos)


2024-09-07 18:31:41,355 - INFO - Start of execution: 2024-09-07 18:31:41.355516
2024-09-07 18:31:41,360 - INFO - Connection successfully
2024-09-07 18:31:41,361 - INFO - Starting to load data into the path gs://data-ingestion-bucket-datamaster/table_ingestion_files/produtos/produtos_20240909
2024-09-07 18:31:42,811 - INFO - Data loaded successfully
2024-09-07 18:31:42,813 - INFO - Total time to load data: 1.449717 seconds
2024-09-07 18:31:43,405 - INFO - Size of loaded data: 0.02 MB
2024-09-07 18:31:44,014 - INFO - Number of lines loaded 50
2024-09-07 18:31:46,383 - INFO - Mandatory columns are valid.
2024-09-07 18:31:46,495 - INFO - The table cadastros.produtos exists.
2024-09-07 18:31:46,497 - INFO - Writing data to the table: produtos
2024-09-07 18:31:54,163 - INFO - Data recording execution time: 7.666322 seconds
2024-09-07 18:31:54,165 - INFO - Checking amount of data entered
2024-09-07 18:31:56,872 - INFO - A total of 50 rows and a total of 17 columns were inserted into the table

[{'table_name': 'produtos', 'load_total_time': 1.449717, 'number_lines_loaded': 50, 'data_size_mb_formatted': 0.02, 'write_total_time': 7.666322, 'qtd_total_rows_insert': 50, 'num_columns_table': 17, 'num_files': 1, 'total_execution': 16.762517, 'dat_carga': '20240907', 'alerta': False}]


In [0]:
#Ingestião tabela clientesxprod
#Variaveis esperadas para template de carga
table_name_clientesxprod = "clientesxprod"
db_name_clientesxprod = "vendas"
odate_clientesxprod = "20240909"
sep = ","
required_columns = ['cliente_id', 'produto_id'] 

#Template de ingestão e atribuição de métricas
metricas_clientesxprod = ingestion(db_name_clientesxprod, table_name_clientesxprod, odate_clientesxprod, sep, required_columns)
print(metricas_clientesxprod)


2024-09-07 18:32:01,857 - INFO - Start of execution: 2024-09-07 18:32:01.857321
2024-09-07 18:32:01,861 - INFO - Connection successfully
2024-09-07 18:32:01,862 - INFO - Starting to load data into the path gs://data-ingestion-bucket-datamaster/table_ingestion_files/clientesxprod/clientesxprod_20240909
2024-09-07 18:32:03,238 - INFO - Data loaded successfully
2024-09-07 18:32:03,239 - INFO - Total time to load data: 1.375346 seconds
2024-09-07 18:32:03,884 - INFO - Size of loaded data: 0.58 MB
2024-09-07 18:32:04,512 - INFO - Number of lines loaded 5000
2024-09-07 18:32:05,426 - INFO - Mandatory columns are valid.
2024-09-07 18:32:05,449 - INFO - The table vendas.clientesxprod exists.
2024-09-07 18:32:05,450 - INFO - Writing data to the table: clientesxprod
2024-09-07 18:32:12,219 - INFO - Data recording execution time: 6.769233 seconds
2024-09-07 18:32:12,222 - INFO - Checking amount of data entered
2024-09-07 18:32:14,876 - INFO - A total of 5000 rows and a total of 5 columns were ins

[{'table_name': 'clientesxprod', 'load_total_time': 1.375346, 'number_lines_loaded': 5000, 'data_size_mb_formatted': 0.58, 'write_total_time': 6.769233, 'qtd_total_rows_insert': 5000, 'num_columns_table': 5, 'num_files': 1, 'total_execution': 14.119215, 'dat_carga': '20240907', 'alerta': False}]
